Coverage for marshallEngine/feeders/atlas/tests/test_data.py : 0%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import print_function
2from builtins import str
3import os
4import unittest
5import shutil
6import yaml
7from marshallEngine.utKit import utKit
8from fundamentals import tools
9from os.path import expanduser
10home = expanduser("~")
12packageDirectory = utKit("").get_project_root()
13# settingsFile = packageDirectory + "/test_settings.yaml"
14settingsFile = home + "/git_repos/_misc_/settings/marshall/test_settings.yaml"
16su = tools(
17 arguments={"settingsFile": settingsFile},
18 docString=__doc__,
19 logLevel="DEBUG",
20 options_first=False,
21 projectName=None,
22 defaultSettingsFile=False
23)
24arguments, settings, log, dbConn = su.setup()
26# SETUP PATHS TO COMMON DIRECTORIES FOR TEST DATA
27moduleDirectory = os.path.dirname(__file__)
28pathToInputDir = moduleDirectory + "/input/"
29pathToOutputDir = moduleDirectory + "/output/"
31try:
32 shutil.rmtree(pathToOutputDir)
33except:
34 pass
35# COPY INPUT TO OUTPUT DIR
36shutil.copytree(pathToInputDir, pathToOutputDir)
38# Recursively create missing directories
39if not os.path.exists(pathToOutputDir):
40 os.makedirs(pathToOutputDir)
42import shutil
43try:
44 shutil.rmtree(settings[
45 "cache-directory"] + "/transients/")
46 shutil.rmtree(settings[
47 "cache-directory"] + "/stats/")
48except:
49 pass
52class test_data(unittest.TestCase):
54 def test_data_function(self):
56 allLists = []
57 from marshallEngine.feeders.atlas.data import data
58 ingester = data(
59 log=log,
60 settings=settings,
61 dbConn=dbConn
62 )
64 from datetime import datetime, date, time, timedelta
65 timelimit = datetime.now() - timedelta(days=1)
66 timelimit = timelimit.strftime("%Y-%m-%d")
68 csvDicts = ingester.get_csv_data(
69 url=settings["atlas urls"]["summary csv"] + f"?followup_flag_date__gte={timelimit}"
70 )
71 ingester._clean_data_pre_ingest(
72 surveyName="ATLAS", withinLastDays=1)
74 # ADD DATA IMPORTING CODE HERE
75 ingester._import_to_feeder_survey_table()
76 ingester.insert_into_transientBucket()
78 def test_data_function2(self):
80 from marshallEngine.feeders.atlas.data import data
81 ingester = data(
82 log=log,
83 settings=settings,
84 dbConn=dbConn
85 ).ingest(withinLastDays=1)
87 def test_data_function_exception(self):
89 from marshallEngine.feeders.atlas.data import data
90 try:
91 this = data(
92 log=log,
93 settings=settings,
94 fakeKey="break the code"
95 )
96 this.get()
97 assert False
98 except Exception as e:
99 assert True
100 print(str(e))
102 # x-print-testpage-for-pessto-marshall-web-object
104 # x-class-to-test-named-worker-function