Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from __future__ import print_function 

2from builtins import str 

3import os 

4import unittest 

5import shutil 

6import yaml 

7from marshallEngine.utKit import utKit 

8from fundamentals import tools 

9from os.path import expanduser 

10home = expanduser("~") 

11 

12packageDirectory = utKit("").get_project_root() 

13# settingsFile = packageDirectory + "/test_settings.yaml" 

14settingsFile = home + "/git_repos/_misc_/settings/marshall/test_settings.yaml" 

15 

16su = tools( 

17 arguments={"settingsFile": settingsFile}, 

18 docString=__doc__, 

19 logLevel="DEBUG", 

20 options_first=False, 

21 projectName=None, 

22 defaultSettingsFile=False 

23) 

24arguments, settings, log, dbConn = su.setup() 

25 

26# SETUP PATHS TO COMMON DIRECTORIES FOR TEST DATA 

27moduleDirectory = os.path.dirname(__file__) 

28pathToInputDir = moduleDirectory + "/input/" 

29pathToOutputDir = moduleDirectory + "/output/" 

30 

31try: 

32 shutil.rmtree(pathToOutputDir) 

33except: 

34 pass 

35# COPY INPUT TO OUTPUT DIR 

36shutil.copytree(pathToInputDir, pathToOutputDir) 

37 

38# Recursively create missing directories 

39if not os.path.exists(pathToOutputDir): 

40 os.makedirs(pathToOutputDir) 

41 

42 

43class test_data(unittest.TestCase): 

44 

45 def test_data_function(self): 

46 

47 allLists = [] 

48 from marshallEngine.feeders.panstarrs.data import data 

49 ingester = data( 

50 log=log, 

51 settings=settings, 

52 dbConn=dbConn 

53 ) 

54 csvDicts = ingester.get_csv_data( 

55 url=settings["panstarrs urls"]["ps13pi"]["summary csv"], 

56 user=settings["credentials"]["ps13pi"]["username"], 

57 pwd=settings["credentials"]["ps13pi"]["password"] 

58 ) 

59 allLists.extend(ingester._clean_data_pre_ingest( 

60 surveyName="ps13pi", withinLastDays=1)) 

61 

62 ingester.get_csv_data( 

63 url=settings["panstarrs urls"]["ps13pi"]["recurrence csv"], 

64 user=settings["credentials"]["ps13pi"]["username"], 

65 pwd=settings["credentials"]["ps13pi"]["password"] 

66 ) 

67 allLists.extend(ingester._clean_data_pre_ingest( 

68 surveyName="ps13pi", withinLastDays=1)) 

69 ingester.dictList = allLists 

70 ingester._import_to_feeder_survey_table() 

71 ingester.insert_into_transientBucket() 

72 

73 def test_data_function2(self): 

74 

75 allLists = [] 

76 from marshallEngine.feeders.panstarrs.data import data 

77 ingester = data( 

78 log=log, 

79 settings=settings, 

80 dbConn=dbConn 

81 ) 

82 csvDicts = ingester.get_csv_data( 

83 url=settings["panstarrs urls"]["pso3"]["summary csv"], 

84 user=settings["credentials"]["pso3"]["username"], 

85 pwd=settings["credentials"]["pso3"]["password"] 

86 ) 

87 data = ingester._clean_data_pre_ingest( 

88 surveyName="pso3", withinLastDays=1) 

89 allLists.extend(data) 

90 ingester.get_csv_data( 

91 url=settings["panstarrs urls"]["pso3"]["recurrence csv"], 

92 user=settings["credentials"]["pso3"]["username"], 

93 pwd=settings["credentials"]["pso3"]["password"] 

94 ) 

95 data = ingester._clean_data_pre_ingest( 

96 surveyName="pso3", withinLastDays=1) 

97 allLists.extend(data) 

98 ingester.dictList = allLists 

99 ingester._import_to_feeder_survey_table() 

100 ingester.insert_into_transientBucket() 

101 

102 def test_data_function3(self): 

103 

104 from marshallEngine.feeders.panstarrs.data import data 

105 ingester = data( 

106 log=log, 

107 settings=settings, 

108 dbConn=dbConn 

109 ).ingest(withinLastDays=1) 

110 

111 def test_data_function_exception(self): 

112 

113 from marshallEngine.feeders.panstarrs.data import data 

114 try: 

115 this = data( 

116 log=log, 

117 settings=settings, 

118 fakeKey="break the code" 

119 ) 

120 this.get() 

121 assert False 

122 except Exception as e: 

123 assert True 

124 print(str(e)) 

125 

126 # x-print-testpage-for-pessto-marshall-web-object 

127 

128 # x-class-to-test-named-worker-function