Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/local/bin/python 

2# encoding: utf-8 

3""" 

4*Baseclass for survey data ingesters* 

5 

6:Author: 

7 David Young 

8""" 

9from __future__ import print_function 

10from __future__ import division 

11from builtins import zip 

12from builtins import str 

13from builtins import range 

14from builtins import object 

15from past.utils import old_div 

16import sys 

17import os 

18os.environ['TERM'] = 'vt100' 

19import csv 

20import requests 

21from requests.auth import HTTPBasicAuth 

22from fundamentals.mysql import insert_list_of_dictionaries_into_database_tables, readquery, writequery 

23from fundamentals import tools 

24 

25 

26class data(object): 

27 """ 

28 *This baseclass for the feeder survey data imports* 

29 

30 **Usage** 

31 

32 .. todo:: 

33 

34 - create a frankenstein template for importer 

35 

36 To create a new survey data ingester create a new class using this class as the baseclass: 

37 

38 ```python 

39 from ..data import data as basedata 

40 class data(basedata): 

41 .... 

42 ``` 

43 

44 """ 

45 

46 def get_csv_data( 

47 self, 

48 url, 

49 user=False, 

50 pwd=False): 

51 """*collect the CSV data from a URL with option to supply basic auth credentials* 

52 

53 **Key Arguments** 

54 

55 - ``url`` -- the url to the csv file 

56 - ``user`` -- basic auth username 

57 - ``pwd`` -- basic auth password 

58 

59 

60 **Return** 

61 

62 - ``csvData`` -- a list of dictionaries from the csv file 

63 

64 

65 **Usage** 

66 

67 To get the CSV data for a suvery from a given URL in the marshall settings file run something similar to: 

68 

69 ```python 

70 from marshallEngine.feeders.panstarrs.data import data 

71 ingester = data( 

72 log=log, 

73 settings=settings, 

74 dbConn=dbConn 

75 ) 

76 csvDicts = ingester.get_csv_data( 

77 url=settings["panstarrs urls"]["3pi"]["summary csv"], 

78 user=settings["credentials"]["ps1-3pi"]["username"], 

79 pwd=settings["credentials"]["ps1-3pi"]["password"] 

80 ) 

81 ``` 

82 

83 Note you will also be able to access the data via ``ingester.csvDicts``  

84 

85 """ 

86 self.log.debug('starting the ``get_csv_data`` method') 

87 

88 # DOWNLOAD THE CSV FILE DATA 

89 try: 

90 if user: 

91 response = requests.get( 

92 url=url, 

93 auth=HTTPBasicAuth(user, pwd) 

94 ) 

95 else: 

96 response = requests.get( 

97 url=url 

98 ) 

99 status_code = response.status_code 

100 except requests.exceptions.RequestException: 

101 print('HTTP Request failed') 

102 sys.exit(0) 

103 

104 if status_code == 502: 

105 print('HTTP Request failed - status %(status_code)s' % locals()) 

106 print(url) 

107 self.csvDicts = [] 

108 return self.csvDicts 

109 

110 if status_code != 200: 

111 print('HTTP Request failed - status %(status_code)s' % locals()) 

112 raise ConnectionError( 

113 'HTTP Request failed - status %(status_code)s' % locals()) 

114 

115 # CONVERT THE RESPONSE TO CSV LIST OF DICTIONARIES 

116 self.csvDicts = csv.DictReader( 

117 response.iter_lines(decode_unicode='utf-8'), dialect='excel', delimiter='|', quotechar='"') 

118 

119 self.log.debug('completed the ``get_csv_data`` method') 

120 return self.csvDicts 

121 

122 def _import_to_feeder_survey_table( 

123 self): 

124 """*import the list of dictionaries (self.dictList) into the marshall feeder survey table* 

125 

126 **Return** 

127 

128 - None 

129 

130 

131 **Usage** 

132 

133 ```python 

134 self._import_to_feeder_survey_table() 

135 ``` 

136 

137 """ 

138 self.log.debug( 

139 'starting the ``_import_to_feeder_survey_table`` method') 

140 

141 if not len(self.dictList): 

142 return 

143 

144 # USE dbSettings TO ACTIVATE MULTIPROCESSING 

145 insert_list_of_dictionaries_into_database_tables( 

146 dbConn=self.dbConn, 

147 log=self.log, 

148 dictList=self.dictList, 

149 dbTableName=self.fsTableName, 

150 dateModified=True, 

151 dateCreated=True, 

152 batchSize=2500, 

153 replace=True, 

154 dbSettings=self.settings["database settings"] 

155 ) 

156 

157 self.log.debug( 

158 'completed the ``_import_to_feeder_survey_table`` method') 

159 return None 

160 

161 def insert_into_transientBucket( 

162 self, 

163 importUnmatched=True, 

164 updateTransientSummaries=True): 

165 """*insert objects/detections from the feeder survey table into the transientbucket* 

166 

167 **Key Arguments** 

168 

169 - ``importUnmatched`` -- import unmatched (new) transients into the marshall (not wanted in some circumstances) 

170 - ``updateTransientSummaries`` -- update the transient summaries and lightcurves? Can be True or False, or alternatively a specific transientBucketId 

171 

172 

173 This method aims to reduce crossmatching and load on the database by: 

174 

175 1. automatically assign the transientbucket id to feeder survey detections where the object name is found in the transientbukcet (no spatial crossmatch required). Copy matched feeder survey rows to the transientbucket. 

176 2. crossmatch remaining unique, unmatched sources in feeder survey with sources in the transientbucket. Add associated transientBucketIds to matched feeder survey sources. Copy matched feeder survey rows to the transientbucket. 

177 3. assign a new transientbucketid to any feeder survey source not matched in steps 1 & 2. Copy these unmatched feeder survey rows to the transientbucket as new transient detections. 

178 

179 **Return** 

180 

181 - None 

182 

183 

184 **Usage** 

185 

186 ```python 

187 ingester.insert_into_transientBucket() 

188 ``` 

189 

190 """ 

191 self.log.debug( 

192 'starting the ``crossmatch_with_transientBucket`` method') 

193 

194 fsTableName = self.fsTableName 

195 

196 # 1. automatically assign the transientbucket id to feeder survey 

197 # detections where the object name is found in the transientbukcet (no 

198 # spatial crossmatch required). Copy matched feeder survey rows to the 

199 # transientbucket. 

200 self._feeder_survey_transientbucket_name_match_and_import() 

201 

202 # 2. crossmatch remaining unique, unmatched sources in feeder survey 

203 # with sources in the transientbucket. Add associated 

204 # transientBucketIds to matched feeder survey sources. Copy matched 

205 # feeder survey rows to the transientbucket. 

206 from HMpTy.mysql import add_htm_ids_to_mysql_database_table 

207 add_htm_ids_to_mysql_database_table( 

208 raColName="raDeg", 

209 declColName="decDeg", 

210 tableName="transientBucket", 

211 dbConn=self.dbConn, 

212 log=self.log, 

213 primaryIdColumnName="primaryKeyId", 

214 dbSettings=self.settings["database settings"] 

215 ) 

216 unmatched = self._feeder_survey_transientbucket_crossmatch() 

217 

218 # 3. assign a new transientbucketid to any feeder survey source not 

219 # matched in steps 1 & 2. Copy these unmatched feeder survey rows to 

220 # the transientbucket as new transient detections. 

221 if importUnmatched: 

222 self._import_unmatched_feeder_survey_sources_to_transientbucket( 

223 unmatched) 

224 

225 # UPDATE OBSERVATION DATES FROM MJDs 

226 sqlQuery = "call update_transientbucket_observation_dates()" 

227 writequery( 

228 log=self.log, 

229 sqlQuery=sqlQuery, 

230 dbConn=self.dbConn 

231 ) 

232 

233 # UPDATE THE TRANSIENT BUCKET SUMMARY TABLE IN THE MARSHALL DATABASE 

234 if updateTransientSummaries: 

235 if isinstance(updateTransientSummaries, int) and not isinstance(updateTransientSummaries, bool): 

236 transientBucketId = updateTransientSummaries 

237 else: 

238 transientBucketId = False 

239 from marshallEngine.housekeeping import update_transient_summaries 

240 updater = update_transient_summaries( 

241 log=self.log, 

242 settings=self.settings, 

243 dbConn=self.dbConn, 

244 transientBucketId=transientBucketId 

245 ) 

246 updater.update() 

247 

248 self.log.debug( 

249 'completed the ``crossmatch_with_transientBucket`` method') 

250 return None 

251 

252 def _feeder_survey_transientbucket_name_match_and_import( 

253 self): 

254 """*automatically assign the transientbucket id to feeder survey detections where the object name is found in the transientbukcet (no spatial crossmatch required). Copy feeder survey rows to the transientbucket.* 

255 

256 **Return** 

257 

258 - None 

259 

260 

261 **Usage** 

262 

263 ```python 

264 self._feeder_survey_transientbucket_name_match_and_import() 

265 ``` 

266 

267 """ 

268 self.log.debug( 

269 'starting the ``_feeder_survey_transientbucket_name_match_and_import`` method') 

270 

271 fsTableName = self.fsTableName 

272 

273 # MATCH TRANSIENT BUCKET IDS WITH NAMES FOUND IN FEEDER TABLE, THEN 

274 # COPY ROWS TO TRANSIENTBUCKET USING COLUMN MATCH TABLE IN DATABASE 

275 sqlQuery = """CALL `sync_marshall_feeder_survey_transientBucketId`('%(fsTableName)s');""" % locals( 

276 ) 

277 

278 writequery( 

279 log=self.log, 

280 sqlQuery=sqlQuery, 

281 dbConn=self.dbConn 

282 ) 

283 

284 self.log.debug( 

285 'completed the ``_feeder_survey_transientbucket_name_match_and_import`` method') 

286 return None 

287 

288 def _feeder_survey_transientbucket_crossmatch( 

289 self): 

290 """*crossmatch remaining unique, unmatched sources in feeder survey with sources in the transientbucket & copy matched feeder survey rows to the transientbucket* 

291 

292 **Return** 

293 

294 - ``unmatched`` -- a list of the unmatched (i.e. new to the marshall) feeder survey surveys 

295 

296 """ 

297 self.log.debug( 

298 'starting the ``_feeder_survey_transientbucket_crossmatch`` method') 

299 

300 fsTableName = self.fsTableName 

301 

302 # GET THE COLUMN MAP FOR THE FEEDER SURVEY TABLE 

303 sqlQuery = u""" 

304 SELECT * FROM marshall_fs_column_map where fs_table_name = '%(fsTableName)s' and transientBucket_column in ('name','raDeg','decDeg','limitingMag') 

305 """ % locals() 

306 rows = readquery( 

307 log=self.log, 

308 sqlQuery=sqlQuery, 

309 dbConn=self.dbConn, 

310 quiet=False 

311 ) 

312 

313 columns = {} 

314 for row in rows: 

315 columns[row["transientBucket_column"]] = row["fs_table_column"] 

316 

317 if "raDeg" not in columns: 

318 print(f"No coordinates to match in the {fsTableName} table") 

319 return [] 

320 

321 # BUILD QUERY TO GET UNIQUE UN-MATCHED SOURCES 

322 fs_name = columns["name"] 

323 self.fs_name = fs_name 

324 fs_ra = columns["raDeg"] 

325 fs_dec = columns["decDeg"] 

326 if 'limitingMag' in columns: 

327 fs_lim = columns["limitingMag"] 

328 limitClause = " and %(fs_lim)s = 0 " % locals() 

329 else: 

330 limitClause = "" 

331 sqlQuery = u""" 

332 select %(fs_name)s, avg(%(fs_ra)s) as %(fs_ra)s, avg(%(fs_dec)s) as %(fs_dec)s from %(fsTableName)s where ingested = 0 %(limitClause)s and %(fs_ra)s is not null and %(fs_dec)s is not null group by %(fs_name)s  

333 """ % locals() 

334 

335 rows = readquery( 

336 log=self.log, 

337 sqlQuery=sqlQuery, 

338 dbConn=self.dbConn, 

339 quiet=False 

340 ) 

341 

342 # STOP IF NO MATCHES 

343 if not len(rows): 

344 return [] 

345 

346 # SPLIT INTO BATCHES SO NOT TO OVERWHELM MEMORY 

347 batchSize = 200 

348 total = len(rows) 

349 batches = int(old_div(total, batchSize)) 

350 start = 0 

351 end = 0 

352 theseBatches = [] 

353 for i in range(batches + 1): 

354 end = end + batchSize 

355 start = i * batchSize 

356 thisBatch = rows[start:end] 

357 theseBatches.append(thisBatch) 

358 

359 unmatched = [] 

360 ticker = 0 

361 for batch in theseBatches: 

362 

363 fs_name_list = [] 

364 fs_ra_list = [] 

365 fs_dec_list = [] 

366 fs_name_list = [row[fs_name] for row in batch if row[fs_ra]] 

367 fs_ra_list = [row[fs_ra] for row in batch if row[fs_ra]] 

368 fs_dec_list = [row[fs_dec] for row in batch if row[fs_ra]] 

369 

370 ticker += len(fs_name_list) 

371 print("Matching %(ticker)s/%(total)s sources in the %(fsTableName)s against the transientBucket table" % locals()) 

372 

373 # CONESEARCH TRANSIENT BUCKET FOR PRE-KNOWN SOURCES FROM OTHER 

374 # SURVEYS 

375 from HMpTy.mysql import conesearch 

376 cs = conesearch( 

377 log=self.log, 

378 dbConn=self.dbConn, 

379 tableName="transientBucket", 

380 columns="transientBucketId, name", 

381 ra=fs_ra_list, 

382 dec=fs_dec_list, 

383 radiusArcsec=3.5, 

384 separations=True, 

385 distinct=True, 

386 sqlWhere="masterIDFlag=1", 

387 closest=True 

388 ) 

389 matchIndies, matches = cs.search() 

390 

391 # CREATE SQL QUERY TO UPDATE MATCHES IN FS TABLE WITH MATCHED 

392 # TRANSIENTBUCKET IDs 

393 updates = [] 

394 originalList = matches.list 

395 originalTotal = len(originalList) 

396 

397 print("Adding %(originalTotal)s new %(fsTableName)s transient detections to the transientBucket table" % locals()) 

398 if originalTotal: 

399 updates = [] 

400 updates[:] = ["update " + fsTableName + " set transientBucketId = " + str(o['transientBucketId']) + 

401 " where " + fs_name + " = '" + str(fs_name_list[m]) + "' and transientBucketId is null;" for m, o in zip(matchIndies, originalList)] 

402 updates = ("\n").join(updates) 

403 writequery( 

404 log=self.log, 

405 sqlQuery=updates, 

406 dbConn=self.dbConn 

407 ) 

408 

409 # RETURN UNMATCHED TRANSIENTS 

410 for i, v in enumerate(fs_name_list): 

411 if i not in matchIndies: 

412 unmatched.append(v) 

413 

414 # COPY MATCHED ROWS TO TRANSIENTBUCKET 

415 self._feeder_survey_transientbucket_name_match_and_import() 

416 

417 self.log.debug( 

418 'completed the ``_feeder_survey_transientbucket_crossmatch`` method') 

419 return unmatched 

420 

421 def _import_unmatched_feeder_survey_sources_to_transientbucket( 

422 self, 

423 unmatched): 

424 """*assign a new transientbucketid to any feeder survey source not yet matched in steps. Copy these unmatched feeder survey rows to the transientbucket as new transient detections.* 

425 

426 **Key Arguments** 

427 

428 - ``unmatched`` -- the remaining unmatched feeder survey object names. 

429 

430 """ 

431 self.log.debug( 

432 'starting the ``_import_unmatched_feeder_survey_sources_to_transientbucket`` method') 

433 

434 if not len(unmatched): 

435 return None 

436 

437 fsTableName = self.fsTableName 

438 fs_name = self.fs_name 

439 

440 # READ MAX TRANSIENTBUCKET ID FROM TRANSIENTBUCKET 

441 sqlQuery = u""" 

442 select max(transientBucketId) as maxId from transientBucket 

443 """ % locals() 

444 rows = readquery( 

445 log=self.log, 

446 sqlQuery=sqlQuery, 

447 dbConn=self.dbConn 

448 ) 

449 

450 if not len(rows) or not rows[0]["maxId"]: 

451 maxId = 1 

452 else: 

453 maxId = rows[0]["maxId"] + 1 

454 

455 # ADD NEW TRANSIENTBUCKETIDS TO FEEDER SURVEY TABLE 

456 updates = [] 

457 newTransientBucketIds = [] 

458 for u in unmatched: 

459 update = "update " + fsTableName + " set transientBucketId = " + \ 

460 str(maxId) + " where " + fs_name + " = '" + str(u) + "';" 

461 updates.append(update) 

462 newTransientBucketIds.append(str(maxId)) 

463 maxId += 1 

464 updates = ("\n").join(updates) 

465 writequery( 

466 log=self.log, 

467 sqlQuery=updates, 

468 dbConn=self.dbConn 

469 ) 

470 

471 # COPY FEEDER SURVEY ROWS TO TRANSIENTBUCKET 

472 self._feeder_survey_transientbucket_name_match_and_import() 

473 

474 # SET THE MASTER ID FLAG FOR ALL NEW TRANSIENTS IN THE TRANSIENTBUCKET 

475 newTransientBucketIds = (",").join(newTransientBucketIds) 

476 sqlQuery = """UPDATE transientBucket t 

477 JOIN 

478 (SELECT  

479 transientBucketId, MIN(primaryKeyId) AS minpk 

480 FROM 

481 transientBucket 

482 WHERE 

483 transientBucketId IN (%(newTransientBucketIds)s) 

484 GROUP BY transientBucketId) tmin ON t.primaryKeyId = tmin.minpk  

485 SET  

486 masterIDFlag = 1;""" % locals() 

487 writequery( 

488 log=self.log, 

489 sqlQuery=sqlQuery, 

490 dbConn=self.dbConn 

491 ) 

492 

493 self.log.debug( 

494 'completed the ``_import_unmatched_feeder_survey_sources_to_transientbucket`` method') 

495 return None 

496 

497 # use the tab-trigger below for new method 

498 def clean_up( 

499 self): 

500 """*A few tasks to finish off the ingest* 

501 

502 **Key Arguments:** 

503 # - 

504 

505 **Return:** 

506 - None 

507 

508 **Usage:** 

509 

510 ```python 

511 usage code  

512 ``` 

513 

514 --- 

515 

516 ```eval_rst 

517 .. todo:: 

518 

519 - add usage info 

520 - create a sublime snippet for usage 

521 - write a command-line tool for this method 

522 - update package tutorial with command-line tool info if needed 

523 ``` 

524 """ 

525 self.log.debug('starting the ``clean_up`` method') 

526 

527 sqlQueries = [ 

528 "insert into sherlock_classifications (transient_object_id) select distinct transientBucketId from transientBucketSummaries ON DUPLICATE KEY UPDATE transient_object_id = transientBucketId;", 

529 "CALL update_transient_akas(1); " 

530 ] 

531 

532 for sqlQuery in sqlQueries: 

533 writequery( 

534 log=self.log, 

535 sqlQuery=sqlQuery, 

536 dbConn=self.dbConn 

537 ) 

538 

539 self.log.debug('completed the ``clean_up`` method') 

540 return None 

541 

542 # use the tab-trigger below for new method 

543 # xt-class-method