Coverage for marshallEngine/services/tests/test_soxs_scheduler.py : 72%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import print_function
2from builtins import str
3import os
4import unittest
5import shutil
6import unittest
7import yaml
8from marshallEngine.utKit import utKit
9from fundamentals import tools
10from os.path import expanduser
11from docopt import docopt
12from marshallEngine import cl_utils
13doc = cl_utils.__doc__
14home = expanduser("~")
16packageDirectory = utKit("").get_project_root()
17#settingsFile = packageDirectory + "/test_settings.yaml"
18settingsFile = home + "/git_repos/_misc_/settings/marshall/test_settings.yaml"
21su = tools(
22 arguments={"settingsFile": settingsFile},
23 docString=__doc__,
24 logLevel="DEBUG",
25 options_first=False,
26 projectName=None,
27 defaultSettingsFile=False
28)
29arguments, settings, log, dbConn = su.setup()
31# SETUP PATHS TO COMMON DIRECTORIES FOR TEST DATA
32moduleDirectory = os.path.dirname(__file__)
33pathToInputDir = moduleDirectory + "/input/"
34pathToOutputDir = moduleDirectory + "/output/"
36try:
37 shutil.rmtree(pathToOutputDir)
38except:
39 pass
40# COPY INPUT TO OUTPUT DIR
41shutil.copytree(pathToInputDir, pathToOutputDir)
43# Recursively create missing directories
44if not os.path.exists(pathToOutputDir):
45 os.makedirs(pathToOutputDir)
48# xt-setup-unit-testing-files-and-folders
49# xt-utkit-refresh-database
51class test_soxs_scheduler(unittest.TestCase):
53 def test_soxs_scheduler_create_single_auto_ob(self):
55 from marshallEngine.services import soxs_scheduler
56 schr = soxs_scheduler(
57 log=log,
58 dbConn=dbConn,
59 settings=settings
60 )
61 obid = schr._create_single_auto_ob(
62 transientBucketId=1,
63 target_name="joe",
64 raDeg=23.3445,
65 decDeg=-30.034,
66 magnitude_list=[["g", 19.06]]
67 )
68 print(obid)
70 def test_soxs_scheduler_request_all_required_auto_obs(self):
72 # ADD A ROW TO BE IMPORTED
73 # utKit.refresh_database()
74 from fundamentals.mysql import writequery
75 sqlQuery = """INSERT IGNORE INTO `fs_user_added` (`id`,`candidateID`,`ra_deg`,`dec_deg`,`mag`,`magErr`,`filter`,`observationMJD`,`discDate`,`discMag`,`suggestedType`,`catalogType`,`hostZ`,`targetImageURL`,`objectURL`,`summaryRow`,`ingested`,`htm16ID`,`survey`,`author`,`dateCreated`,`dateLastModified`,`suggestedClassification`,`htm13ID`,`htm10ID`,`transientBucketId`) VALUES (856,'TestSource',155.125958333,-15.1787369444,20.3,NULL,NULL,57627.5,'2016-08-27',20.3,'SN',NULL,0.34,'http://thespacedoctor.co.uk/images/thespacedoctor_icon_white_circle.png','http://thespacedoctor.co.uk',1,0,NULL,'testSurvey','None','2019-07-30 14:25:39','2019-07-30 14:25:39',NULL,NULL,NULL,NULL);""" % locals()
76 writequery(
77 log=log,
78 sqlQuery=sqlQuery,
79 dbConn=dbConn
80 )
82 from marshallEngine.feeders.useradded.data import data
83 ingester = data(
84 log=log,
85 settings=settings,
86 dbConn=dbConn
87 ).ingest(withinLastDays=3)
89 from marshallEngine.services import soxs_scheduler
90 schr = soxs_scheduler(
91 log=log,
92 dbConn=dbConn,
93 settings=settings
94 )
95 passedOBID, failedOBIDs = schr.request_all_required_auto_obs()
96 print(failedOBIDs)
98 def test_soxs_scheduler_function_exception(self):
100 from marshallEngine.services import soxs_scheduler
101 try:
102 this = soxs_scheduler(
103 log=log,
104 settings=settings,
105 fakeKey="break the code"
106 )
107 this.get()
108 assert False
109 except Exception as e:
110 assert True
111 print(str(e))
113 # x-print-testpage-for-pessto-marshall-web-object
115 # x-class-to-test-named-worker-function