Coverage for marshallEngine/feeders/data.py : 0%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/local/bin/python
2# encoding: utf-8
3"""
4*Baseclass for survey data ingesters*
6:Author:
7 David Young
8"""
9from __future__ import print_function
10from __future__ import division
11from builtins import zip
12from builtins import str
13from builtins import range
14from builtins import object
15from past.utils import old_div
16import sys
17import os
18os.environ['TERM'] = 'vt100'
19import csv
20import requests
21from requests.auth import HTTPBasicAuth
22from fundamentals.mysql import insert_list_of_dictionaries_into_database_tables, readquery, writequery
23from fundamentals import tools
26class data(object):
27 """
28 *This baseclass for the feeder survey data imports*
30 **Usage**
32 .. todo::
34 - create a frankenstein template for importer
36 To create a new survey data ingester create a new class using this class as the baseclass:
38 ```python
39 from ..data import data as basedata
40 class data(basedata):
41 ....
42 ```
44 """
46 def get_csv_data(
47 self,
48 url,
49 user=False,
50 pwd=False):
51 """*collect the CSV data from a URL with option to supply basic auth credentials*
53 **Key Arguments**
55 - ``url`` -- the url to the csv file
56 - ``user`` -- basic auth username
57 - ``pwd`` -- basic auth password
60 **Return**
62 - ``csvData`` -- a list of dictionaries from the csv file
65 **Usage**
67 To get the CSV data for a suvery from a given URL in the marshall settings file run something similar to:
69 ```python
70 from marshallEngine.feeders.panstarrs.data import data
71 ingester = data(
72 log=log,
73 settings=settings,
74 dbConn=dbConn
75 )
76 csvDicts = ingester.get_csv_data(
77 url=settings["panstarrs urls"]["3pi"]["summary csv"],
78 user=settings["credentials"]["ps1-3pi"]["username"],
79 pwd=settings["credentials"]["ps1-3pi"]["password"]
80 )
81 ```
83 Note you will also be able to access the data via ``ingester.csvDicts``
85 """
86 self.log.debug('starting the ``get_csv_data`` method')
88 # DOWNLOAD THE CSV FILE DATA
89 try:
90 if user:
91 response = requests.get(
92 url=url,
93 auth=HTTPBasicAuth(user, pwd)
94 )
95 else:
96 response = requests.get(
97 url=url
98 )
99 status_code = response.status_code
100 except requests.exceptions.RequestException:
101 print('HTTP Request failed')
102 sys.exit(0)
104 if status_code == 502:
105 print('HTTP Request failed - status %(status_code)s' % locals())
106 print(url)
107 self.csvDicts = []
108 return self.csvDicts
110 if status_code != 200:
111 print('HTTP Request failed - status %(status_code)s' % locals())
112 raise ConnectionError(
113 'HTTP Request failed - status %(status_code)s' % locals())
115 # CONVERT THE RESPONSE TO CSV LIST OF DICTIONARIES
116 self.csvDicts = csv.DictReader(
117 response.iter_lines(decode_unicode='utf-8'), dialect='excel', delimiter='|', quotechar='"')
119 self.log.debug('completed the ``get_csv_data`` method')
120 return self.csvDicts
122 def _import_to_feeder_survey_table(
123 self):
124 """*import the list of dictionaries (self.dictList) into the marshall feeder survey table*
126 **Return**
128 - None
131 **Usage**
133 ```python
134 self._import_to_feeder_survey_table()
135 ```
137 """
138 self.log.debug(
139 'starting the ``_import_to_feeder_survey_table`` method')
141 if not len(self.dictList):
142 return
144 # USE dbSettings TO ACTIVATE MULTIPROCESSING
145 insert_list_of_dictionaries_into_database_tables(
146 dbConn=self.dbConn,
147 log=self.log,
148 dictList=self.dictList,
149 dbTableName=self.fsTableName,
150 dateModified=True,
151 dateCreated=True,
152 batchSize=2500,
153 replace=True,
154 dbSettings=self.settings["database settings"]
155 )
157 self.log.debug(
158 'completed the ``_import_to_feeder_survey_table`` method')
159 return None
161 def insert_into_transientBucket(
162 self,
163 importUnmatched=True,
164 updateTransientSummaries=True):
165 """*insert objects/detections from the feeder survey table into the transientbucket*
167 **Key Arguments**
169 - ``importUnmatched`` -- import unmatched (new) transients into the marshall (not wanted in some circumstances)
170 - ``updateTransientSummaries`` -- update the transient summaries and lightcurves? Can be True or False, or alternatively a specific transientBucketId
173 This method aims to reduce crossmatching and load on the database by:
175 1. automatically assign the transientbucket id to feeder survey detections where the object name is found in the transientbukcet (no spatial crossmatch required). Copy matched feeder survey rows to the transientbucket.
176 2. crossmatch remaining unique, unmatched sources in feeder survey with sources in the transientbucket. Add associated transientBucketIds to matched feeder survey sources. Copy matched feeder survey rows to the transientbucket.
177 3. assign a new transientbucketid to any feeder survey source not matched in steps 1 & 2. Copy these unmatched feeder survey rows to the transientbucket as new transient detections.
179 **Return**
181 - None
184 **Usage**
186 ```python
187 ingester.insert_into_transientBucket()
188 ```
190 """
191 self.log.debug(
192 'starting the ``crossmatch_with_transientBucket`` method')
194 fsTableName = self.fsTableName
196 # 1. automatically assign the transientbucket id to feeder survey
197 # detections where the object name is found in the transientbukcet (no
198 # spatial crossmatch required). Copy matched feeder survey rows to the
199 # transientbucket.
200 self._feeder_survey_transientbucket_name_match_and_import()
202 # 2. crossmatch remaining unique, unmatched sources in feeder survey
203 # with sources in the transientbucket. Add associated
204 # transientBucketIds to matched feeder survey sources. Copy matched
205 # feeder survey rows to the transientbucket.
206 from HMpTy.mysql import add_htm_ids_to_mysql_database_table
207 add_htm_ids_to_mysql_database_table(
208 raColName="raDeg",
209 declColName="decDeg",
210 tableName="transientBucket",
211 dbConn=self.dbConn,
212 log=self.log,
213 primaryIdColumnName="primaryKeyId",
214 dbSettings=self.settings["database settings"]
215 )
216 unmatched = self._feeder_survey_transientbucket_crossmatch()
218 # 3. assign a new transientbucketid to any feeder survey source not
219 # matched in steps 1 & 2. Copy these unmatched feeder survey rows to
220 # the transientbucket as new transient detections.
221 if importUnmatched:
222 self._import_unmatched_feeder_survey_sources_to_transientbucket(
223 unmatched)
225 # UPDATE OBSERVATION DATES FROM MJDs
226 sqlQuery = "call update_transientbucket_observation_dates()"
227 writequery(
228 log=self.log,
229 sqlQuery=sqlQuery,
230 dbConn=self.dbConn
231 )
233 # UPDATE THE TRANSIENT BUCKET SUMMARY TABLE IN THE MARSHALL DATABASE
234 if updateTransientSummaries:
235 if isinstance(updateTransientSummaries, int) and not isinstance(updateTransientSummaries, bool):
236 transientBucketId = updateTransientSummaries
237 else:
238 transientBucketId = False
239 from marshallEngine.housekeeping import update_transient_summaries
240 updater = update_transient_summaries(
241 log=self.log,
242 settings=self.settings,
243 dbConn=self.dbConn,
244 transientBucketId=transientBucketId
245 )
246 updater.update()
248 self.log.debug(
249 'completed the ``crossmatch_with_transientBucket`` method')
250 return None
252 def _feeder_survey_transientbucket_name_match_and_import(
253 self):
254 """*automatically assign the transientbucket id to feeder survey detections where the object name is found in the transientbukcet (no spatial crossmatch required). Copy feeder survey rows to the transientbucket.*
256 **Return**
258 - None
261 **Usage**
263 ```python
264 self._feeder_survey_transientbucket_name_match_and_import()
265 ```
267 """
268 self.log.debug(
269 'starting the ``_feeder_survey_transientbucket_name_match_and_import`` method')
271 fsTableName = self.fsTableName
273 # MATCH TRANSIENT BUCKET IDS WITH NAMES FOUND IN FEEDER TABLE, THEN
274 # COPY ROWS TO TRANSIENTBUCKET USING COLUMN MATCH TABLE IN DATABASE
275 sqlQuery = """CALL `sync_marshall_feeder_survey_transientBucketId`('%(fsTableName)s');""" % locals(
276 )
278 writequery(
279 log=self.log,
280 sqlQuery=sqlQuery,
281 dbConn=self.dbConn
282 )
284 self.log.debug(
285 'completed the ``_feeder_survey_transientbucket_name_match_and_import`` method')
286 return None
288 def _feeder_survey_transientbucket_crossmatch(
289 self):
290 """*crossmatch remaining unique, unmatched sources in feeder survey with sources in the transientbucket & copy matched feeder survey rows to the transientbucket*
292 **Return**
294 - ``unmatched`` -- a list of the unmatched (i.e. new to the marshall) feeder survey surveys
296 """
297 self.log.debug(
298 'starting the ``_feeder_survey_transientbucket_crossmatch`` method')
300 fsTableName = self.fsTableName
302 # GET THE COLUMN MAP FOR THE FEEDER SURVEY TABLE
303 sqlQuery = u"""
304 SELECT * FROM marshall_fs_column_map where fs_table_name = '%(fsTableName)s' and transientBucket_column in ('name','raDeg','decDeg','limitingMag')
305 """ % locals()
306 rows = readquery(
307 log=self.log,
308 sqlQuery=sqlQuery,
309 dbConn=self.dbConn,
310 quiet=False
311 )
313 columns = {}
314 for row in rows:
315 columns[row["transientBucket_column"]] = row["fs_table_column"]
317 if "raDeg" not in columns:
318 print(f"No coordinates to match in the {fsTableName} table")
319 return []
321 # BUILD QUERY TO GET UNIQUE UN-MATCHED SOURCES
322 fs_name = columns["name"]
323 self.fs_name = fs_name
324 fs_ra = columns["raDeg"]
325 fs_dec = columns["decDeg"]
326 if 'limitingMag' in columns:
327 fs_lim = columns["limitingMag"]
328 limitClause = " and %(fs_lim)s = 0 " % locals()
329 else:
330 limitClause = ""
331 sqlQuery = u"""
332 select %(fs_name)s, avg(%(fs_ra)s) as %(fs_ra)s, avg(%(fs_dec)s) as %(fs_dec)s from %(fsTableName)s where ingested = 0 %(limitClause)s and %(fs_ra)s is not null and %(fs_dec)s is not null group by %(fs_name)s
333 """ % locals()
335 rows = readquery(
336 log=self.log,
337 sqlQuery=sqlQuery,
338 dbConn=self.dbConn,
339 quiet=False
340 )
342 # STOP IF NO MATCHES
343 if not len(rows):
344 return []
346 # SPLIT INTO BATCHES SO NOT TO OVERWHELM MEMORY
347 batchSize = 200
348 total = len(rows)
349 batches = int(old_div(total, batchSize))
350 start = 0
351 end = 0
352 theseBatches = []
353 for i in range(batches + 1):
354 end = end + batchSize
355 start = i * batchSize
356 thisBatch = rows[start:end]
357 theseBatches.append(thisBatch)
359 unmatched = []
360 ticker = 0
361 for batch in theseBatches:
363 fs_name_list = []
364 fs_ra_list = []
365 fs_dec_list = []
366 fs_name_list = [row[fs_name] for row in batch if row[fs_ra]]
367 fs_ra_list = [row[fs_ra] for row in batch if row[fs_ra]]
368 fs_dec_list = [row[fs_dec] for row in batch if row[fs_ra]]
370 ticker += len(fs_name_list)
371 print("Matching %(ticker)s/%(total)s sources in the %(fsTableName)s against the transientBucket table" % locals())
373 # CONESEARCH TRANSIENT BUCKET FOR PRE-KNOWN SOURCES FROM OTHER
374 # SURVEYS
375 from HMpTy.mysql import conesearch
376 cs = conesearch(
377 log=self.log,
378 dbConn=self.dbConn,
379 tableName="transientBucket",
380 columns="transientBucketId, name",
381 ra=fs_ra_list,
382 dec=fs_dec_list,
383 radiusArcsec=3.5,
384 separations=True,
385 distinct=True,
386 sqlWhere="masterIDFlag=1",
387 closest=True
388 )
389 matchIndies, matches = cs.search()
391 # CREATE SQL QUERY TO UPDATE MATCHES IN FS TABLE WITH MATCHED
392 # TRANSIENTBUCKET IDs
393 updates = []
394 originalList = matches.list
395 originalTotal = len(originalList)
397 print("Adding %(originalTotal)s new %(fsTableName)s transient detections to the transientBucket table" % locals())
398 if originalTotal:
399 updates = []
400 updates[:] = ["update " + fsTableName + " set transientBucketId = " + str(o['transientBucketId']) +
401 " where " + fs_name + " = '" + str(fs_name_list[m]) + "' and transientBucketId is null;" for m, o in zip(matchIndies, originalList)]
402 updates = ("\n").join(updates)
403 writequery(
404 log=self.log,
405 sqlQuery=updates,
406 dbConn=self.dbConn
407 )
409 # RETURN UNMATCHED TRANSIENTS
410 for i, v in enumerate(fs_name_list):
411 if i not in matchIndies:
412 unmatched.append(v)
414 # COPY MATCHED ROWS TO TRANSIENTBUCKET
415 self._feeder_survey_transientbucket_name_match_and_import()
417 self.log.debug(
418 'completed the ``_feeder_survey_transientbucket_crossmatch`` method')
419 return unmatched
421 def _import_unmatched_feeder_survey_sources_to_transientbucket(
422 self,
423 unmatched):
424 """*assign a new transientbucketid to any feeder survey source not yet matched in steps. Copy these unmatched feeder survey rows to the transientbucket as new transient detections.*
426 **Key Arguments**
428 - ``unmatched`` -- the remaining unmatched feeder survey object names.
430 """
431 self.log.debug(
432 'starting the ``_import_unmatched_feeder_survey_sources_to_transientbucket`` method')
434 if not len(unmatched):
435 return None
437 fsTableName = self.fsTableName
438 fs_name = self.fs_name
440 # READ MAX TRANSIENTBUCKET ID FROM TRANSIENTBUCKET
441 sqlQuery = u"""
442 select max(transientBucketId) as maxId from transientBucket
443 """ % locals()
444 rows = readquery(
445 log=self.log,
446 sqlQuery=sqlQuery,
447 dbConn=self.dbConn
448 )
450 if not len(rows) or not rows[0]["maxId"]:
451 maxId = 1
452 else:
453 maxId = rows[0]["maxId"] + 1
455 # ADD NEW TRANSIENTBUCKETIDS TO FEEDER SURVEY TABLE
456 updates = []
457 newTransientBucketIds = []
458 for u in unmatched:
459 update = "update " + fsTableName + " set transientBucketId = " + \
460 str(maxId) + " where " + fs_name + " = '" + str(u) + "';"
461 updates.append(update)
462 newTransientBucketIds.append(str(maxId))
463 maxId += 1
464 updates = ("\n").join(updates)
465 writequery(
466 log=self.log,
467 sqlQuery=updates,
468 dbConn=self.dbConn
469 )
471 # COPY FEEDER SURVEY ROWS TO TRANSIENTBUCKET
472 self._feeder_survey_transientbucket_name_match_and_import()
474 # SET THE MASTER ID FLAG FOR ALL NEW TRANSIENTS IN THE TRANSIENTBUCKET
475 newTransientBucketIds = (",").join(newTransientBucketIds)
476 sqlQuery = """UPDATE transientBucket t
477 JOIN
478 (SELECT
479 transientBucketId, MIN(primaryKeyId) AS minpk
480 FROM
481 transientBucket
482 WHERE
483 transientBucketId IN (%(newTransientBucketIds)s)
484 GROUP BY transientBucketId) tmin ON t.primaryKeyId = tmin.minpk
485 SET
486 masterIDFlag = 1;""" % locals()
487 writequery(
488 log=self.log,
489 sqlQuery=sqlQuery,
490 dbConn=self.dbConn
491 )
493 self.log.debug(
494 'completed the ``_import_unmatched_feeder_survey_sources_to_transientbucket`` method')
495 return None
497 # use the tab-trigger below for new method
498 def clean_up(
499 self):
500 """*A few tasks to finish off the ingest*
502 **Key Arguments:**
503 # -
505 **Return:**
506 - None
508 **Usage:**
510 ```python
511 usage code
512 ```
514 ---
516 ```eval_rst
517 .. todo::
519 - add usage info
520 - create a sublime snippet for usage
521 - write a command-line tool for this method
522 - update package tutorial with command-line tool info if needed
523 ```
524 """
525 self.log.debug('starting the ``clean_up`` method')
527 sqlQueries = [
528 "insert into sherlock_classifications (transient_object_id) select distinct transientBucketId from transientBucketSummaries ON DUPLICATE KEY UPDATE transient_object_id = transientBucketId;",
529 "CALL update_transient_akas(1); "
530 ]
532 for sqlQuery in sqlQueries:
533 writequery(
534 log=self.log,
535 sqlQuery=sqlQuery,
536 dbConn=self.dbConn
537 )
539 self.log.debug('completed the ``clean_up`` method')
540 return None
542 # use the tab-trigger below for new method
543 # xt-class-method