Coverage for marshallEngine/feeders/panstarrs/tests/test_data.py : 0%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import print_function
2from builtins import str
3import os
4import unittest
5import shutil
6import yaml
7from marshallEngine.utKit import utKit
8from fundamentals import tools
9from os.path import expanduser
10home = expanduser("~")
12packageDirectory = utKit("").get_project_root()
13# settingsFile = packageDirectory + "/test_settings.yaml"
14settingsFile = home + "/git_repos/_misc_/settings/marshall/test_settings.yaml"
16su = tools(
17 arguments={"settingsFile": settingsFile},
18 docString=__doc__,
19 logLevel="DEBUG",
20 options_first=False,
21 projectName=None,
22 defaultSettingsFile=False
23)
24arguments, settings, log, dbConn = su.setup()
26# SETUP PATHS TO COMMON DIRECTORIES FOR TEST DATA
27moduleDirectory = os.path.dirname(__file__)
28pathToInputDir = moduleDirectory + "/input/"
29pathToOutputDir = moduleDirectory + "/output/"
31try:
32 shutil.rmtree(pathToOutputDir)
33except:
34 pass
35# COPY INPUT TO OUTPUT DIR
36shutil.copytree(pathToInputDir, pathToOutputDir)
38# Recursively create missing directories
39if not os.path.exists(pathToOutputDir):
40 os.makedirs(pathToOutputDir)
43class test_data(unittest.TestCase):
45 def test_data_function(self):
47 allLists = []
48 from marshallEngine.feeders.panstarrs.data import data
49 ingester = data(
50 log=log,
51 settings=settings,
52 dbConn=dbConn
53 )
54 csvDicts = ingester.get_csv_data(
55 url=settings["panstarrs urls"]["ps13pi"]["summary csv"],
56 user=settings["credentials"]["ps13pi"]["username"],
57 pwd=settings["credentials"]["ps13pi"]["password"]
58 )
59 allLists.extend(ingester._clean_data_pre_ingest(
60 surveyName="ps13pi", withinLastDays=1))
62 ingester.get_csv_data(
63 url=settings["panstarrs urls"]["ps13pi"]["recurrence csv"],
64 user=settings["credentials"]["ps13pi"]["username"],
65 pwd=settings["credentials"]["ps13pi"]["password"]
66 )
67 allLists.extend(ingester._clean_data_pre_ingest(
68 surveyName="ps13pi", withinLastDays=1))
69 ingester.dictList = allLists
70 ingester._import_to_feeder_survey_table()
71 ingester.insert_into_transientBucket()
73 def test_data_function2(self):
75 allLists = []
76 from marshallEngine.feeders.panstarrs.data import data
77 ingester = data(
78 log=log,
79 settings=settings,
80 dbConn=dbConn
81 )
82 csvDicts = ingester.get_csv_data(
83 url=settings["panstarrs urls"]["pso3"]["summary csv"],
84 user=settings["credentials"]["pso3"]["username"],
85 pwd=settings["credentials"]["pso3"]["password"]
86 )
87 data = ingester._clean_data_pre_ingest(
88 surveyName="pso3", withinLastDays=1)
89 allLists.extend(data)
90 ingester.get_csv_data(
91 url=settings["panstarrs urls"]["pso3"]["recurrence csv"],
92 user=settings["credentials"]["pso3"]["username"],
93 pwd=settings["credentials"]["pso3"]["password"]
94 )
95 data = ingester._clean_data_pre_ingest(
96 surveyName="pso3", withinLastDays=1)
97 allLists.extend(data)
98 ingester.dictList = allLists
99 ingester._import_to_feeder_survey_table()
100 ingester.insert_into_transientBucket()
102 def test_data_function3(self):
104 from marshallEngine.feeders.panstarrs.data import data
105 ingester = data(
106 log=log,
107 settings=settings,
108 dbConn=dbConn
109 ).ingest(withinLastDays=1)
111 def test_data_function_exception(self):
113 from marshallEngine.feeders.panstarrs.data import data
114 try:
115 this = data(
116 log=log,
117 settings=settings,
118 fakeKey="break the code"
119 )
120 this.get()
121 assert False
122 except Exception as e:
123 assert True
124 print(str(e))
126 # x-print-testpage-for-pessto-marshall-web-object
128 # x-class-to-test-named-worker-function