Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/local/bin/python 

2# encoding: utf-8 

3""" 

4*cache the panstarrs image stamps* 

5 

6:Author: 

7 David Young 

8""" 

9from __future__ import print_function 

10from __future__ import division 

11from builtins import str 

12from builtins import zip 

13from builtins import object 

14from past.utils import old_div 

15import sys 

16import os 

17os.environ['TERM'] = 'vt100' 

18from fundamentals import tools 

19from fundamentals.mysql import readquery 

20import requests 

21from requests.auth import HTTPBasicAuth 

22import codecs 

23from fundamentals import fmultiprocess 

24from fundamentals.mysql import writequery 

25 

26 

27class images(object): 

28 """ 

29 *The base class for the feeder image cachers* 

30 

31 **Usage** 

32 

33 To create a new survey image cacher create a new class using this class as the baseclass: 

34 

35 ```python 

36 from ..images import images as baseimages 

37 class images(baseimages): 

38 .... 

39 ``` 

40 

41 """ 

42 

43 def cache( 

44 self, 

45 limit=1000): 

46 """*cache the image for the requested survey* 

47 

48 **Key Arguments** 

49 

50 - ``limit`` -- limit the number of transients in the list so not to piss-off survey owners by downloading everything in one go. 

51 

52 

53 **Usage** 

54 

55 ```python 

56 from marshallEngine.feeders.panstarrs import images 

57 cacher = images( 

58 log=log, 

59 settings=settings, 

60 dbConn=dbConn 

61 ).cache(limit=1000) 

62 ``` 

63 

64 """ 

65 self.log.debug('starting the ``cache`` method') 

66 

67 # THESE SURVEY DON'T HAVE IMAGES - PASS 

68 if self.survey in ["tns", "atel", "atels"]: 

69 return 

70 

71 transientBucketIds, subtractedUrls, targetUrls, referenceUrls, tripletUrls = self._list_images_needing_cached() 

72 leng = len(transientBucketIds) 

73 survey = self.survey 

74 

75 if not leng: 

76 print("All _new_ images are cached for the %(survey)s survey" % locals()) 

77 

78 else: 

79 

80 if leng > limit: 

81 print( 

82 "Downloading image stamps for the next %(limit)s transients of %(leng)s remaining for %(survey)s" % locals()) 

83 else: 

84 print( 

85 "Downloading image stamps for the remaining %(leng)s transients for %(survey)s" % locals()) 

86 subtractedStatus, targetStatus, referenceStatus, tripletStatus = self._download( 

87 transientBucketIds=transientBucketIds[:limit], 

88 subtractedUrls=subtractedUrls[:limit], 

89 targetUrls=targetUrls[:limit], 

90 referenceUrls=referenceUrls[:limit], 

91 tripletUrls=tripletUrls[:limit] 

92 ) 

93 self._update_database() 

94 

95 transientBucketIds, subtractedUrls, targetUrls, referenceUrls, tripletUrls = self._list_images_needing_cached( 

96 failedImage=True) 

97 leng = len(transientBucketIds) 

98 

99 if not leng: 

100 print("All images are cached for the %(survey)s survey" % locals()) 

101 

102 else: 

103 print("Downloading image stamps for the next %(limit)s transients of %(leng)s remaining for %(survey)s - previously failed" % locals()) 

104 subtractedStatus, targetStatus, referenceStatus, tripletStatus = self._download( 

105 transientBucketIds=transientBucketIds[:limit], 

106 subtractedUrls=subtractedUrls[:limit], 

107 targetUrls=targetUrls[:limit], 

108 referenceUrls=referenceUrls[:limit], 

109 tripletUrls=tripletUrls[:limit] 

110 ) 

111 self._update_database() 

112 

113 self.log.debug('completed the ``cache`` method') 

114 return None 

115 

116 def _list_images_needing_cached( 

117 self, 

118 failedImage=False): 

119 """*get lists of the transientBucketIds and images needing cached for those transients* 

120 

121 **Key Arguments** 

122 

123 - ``failedImage`` -- second pass attempt to download alternative image for transients 

124 

125 

126 **Return** 

127 

128 - ``transientBucketIds, subtractedUrls, targetUrls, referenceUrls, tripletUrls`` -- synced lists of transientBucketIds, subtracted-, target-, reference- and triplet-image urls. All lists are the same size. 

129 

130 """ 

131 self.log.debug('starting the ``_list_images_needing_cached`` method') 

132 

133 subtractedUrls, targetUrls, referenceUrls, tripletUrls = [], [], [], [] 

134 for imageType, v in list(self.stampFlagColumns.items()): 

135 if not v: 

136 continue 

137 imageUrl = imageType + "ImageUrl" 

138 # CREATE THE STAMP WHERE CLAUSE 

139 if not failedImage: 

140 stampWhere = v + " IS NULL " 

141 else: 

142 stampWhere = v + " = 2 " 

143 

144 # CREATE THE SURVEY WHERE CLAUSE 

145 dbSurveyNames = "t.survey LIKE '%%" + \ 

146 ("%%' OR t.survey LIKE '%%").join(self.dbSurveyNames) + "%%'" 

147 

148 # NOW GENERATE SQL TO GET THE URLS OF STAMPS NEEDING DOWNLOADED 

149 if self.survey == "useradded": 

150 sqlQuery = u""" 

151 SELECT  

152 a.transientBucketId, a.%(imageUrl)s  

153 FROM 

154 transientBucket a 

155 JOIN 

156 (SELECT  

157 MIN(magnitude) AS mag, transientBucketId 

158 FROM 

159 transientBucket 

160 WHERE 

161 magnitude IS NOT NULL 

162 AND %(imageUrl)s IS NOT NULL 

163 AND transientBucketId in (select transientBucketId from fs_user_added) 

164 AND transientBucketId IN (SELECT  

165 transientBucketId 

166 FROM 

167 pesstoObjects 

168 WHERE 

169 %(stampWhere)s and limitingMag = 0) 

170 GROUP BY transientBucketId 

171 ORDER BY transientBucketId) AS b ON a.transientBucketId = b.transientBucketId 

172 AND a.magnitude = b.mag 

173 WHERE limitingMag = 0 

174 GROUP BY transientBucketId; 

175 """ % locals() 

176 else: 

177 sqlQuery = u""" 

178 SELECT  

179 MIN(magnitude) AS mag, t.transientBucketId, %(imageUrl)s 

180FROM 

181 transientBucket t, 

182 pesstoObjects p 

183WHERE 

184 t.magnitude IS NOT NULL 

185 AND t.%(imageUrl)s IS NOT NULL 

186 AND p.%(stampWhere)s 

187 AND t.transientbucketId = p.transientbucketId 

188 AND (%(dbSurveyNames)s) 

189 AND t.limitingMag = 0 group by t.transientBucketId;""" % locals() 

190 

191 if failedImage: 

192 sqlQuery = sqlQuery.replace("AND a.magnitude = b.mag", "").replace( 

193 "GROUP BY a.transientBucketId;", "") 

194 

195 rows = readquery( 

196 log=self.log, 

197 sqlQuery=sqlQuery, 

198 dbConn=self.dbConn, 

199 ) 

200 

201 # SPLIT URLS INTO STAMP TYPES AND ORDER ALONGSIDE 

202 # TRANSIENTBUKCETIDs 

203 transientBucketIds = [] 

204 for row in rows: 

205 transientBucketIds.append(row["transientBucketId"]) 

206 if imageType == "subtracted": 

207 subtractedUrls.append(row["subtractedImageUrl"]) 

208 if imageType == "target": 

209 targetUrls.append(row["targetImageUrl"]) 

210 if imageType == "reference": 

211 referenceUrls.append(row["referenceImageUrl"]) 

212 if imageType == "triplet": 

213 tripletUrls.append(row["tripletImageUrl"]) 

214 

215 for imageType, v in list(self.stampFlagColumns.items()): 

216 if not v: 

217 if imageType == "subtracted": 

218 subtractedUrls = [None] * len(transientBucketIds) 

219 if imageType == "target": 

220 targetUrls = [None] * len(transientBucketIds) 

221 if imageType == "reference": 

222 referenceUrls = [None] * len(transientBucketIds) 

223 if imageType == "triplet": 

224 tripletUrls = [None] * len(transientBucketIds) 

225 

226 self.log.debug('completed the ``_list_images_needing_cached`` method') 

227 self.transientBucketIds = transientBucketIds 

228 return transientBucketIds, subtractedUrls, targetUrls, referenceUrls, tripletUrls 

229 

230 def _download( 

231 self, 

232 transientBucketIds, 

233 subtractedUrls, 

234 targetUrls, 

235 referenceUrls, 

236 tripletUrls): 

237 """*cache the images for the survey under their transientBucketId folders in the web-server cache* 

238 

239 **Key Arguments** 

240 

241 - ``transientBucketIds`` -- the list of transientBucketId for the transients needing images downloaded. 

242 - ``subtractedUrls`` -- the list of subtracted image urls (same length as transientBucketIds list). 

243 - ``targetUrls`` -- the list of target image urls (same length as transientBucketIds list). 

244 - ``referenceUrls`` -- the list of reference image urls (same length as transientBucketIds list). 

245 - ``tripletUrls`` -- the list of triplet image urls (same length as transientBucketIds list). 

246 

247 

248 **Return** 

249 

250 - ``subtractedStatus`` -- status of the subtracted image download (0 = fail, 1 = success, 2 = does not exist) 

251 - ``targetStatus`` -- status of the target image download (0 = fail, 1 = success, 2 = does not exist) 

252 - ``referenceStatus`` -- status of the reference image download (0 = fail, 1 = success, 2 = does not exist) 

253 - ``tripletStatus`` -- status of the triplet image download (0 = fail, 1 = success, 2 = does not exist) 

254 

255 """ 

256 self.log.debug('starting the ``_download`` method') 

257 

258 downloadDirectoryPath = self.downloadDirectoryPath 

259 self.subtractedStatus = [] 

260 self.targetStatus = [] 

261 self.referenceStatus = [] 

262 self.tripletStatus = [] 

263 index = 1 

264 survey = self.survey.lower() 

265 

266 # TOTAL TO DOWNLOAD 

267 count = len(transientBucketIds) 

268 

269 # DOWNLOAD THE IMAGE SETS FOR EACH TRANSIENT AND ADD STATUS TO OUTPUT 

270 # ARRAYS. (0 = fail, 1 = success, 2 = does not exist) 

271 for tid, surl, turl, rurl, purl in zip(transientBucketIds, subtractedUrls, targetUrls, referenceUrls, tripletUrls): 

272 if index > 1: 

273 # Cursor up one line and clear line 

274 sys.stdout.write("\x1b[1A\x1b[2K") 

275 

276 percent = (old_div(float(index), float(count))) * 100. 

277 print('%(index)s/%(count)s (%(percent)1.1f%% done): downloading %(survey)s stamps for transientBucketId: %(tid)s' % locals()) 

278 

279 statusArray = download_image_array(imageArray=[ 

280 tid, surl, turl, rurl, purl], log=self.log, survey=self.survey, downloadPath=downloadDirectoryPath) 

281 self.subtractedStatus.append(statusArray[1]) 

282 self.targetStatus.append(statusArray[2]) 

283 self.referenceStatus.append(statusArray[3]) 

284 self.tripletStatus.append(statusArray[4]) 

285 index += 1 

286 

287 self.log.debug('completed the ``_download`` method') 

288 return self.subtractedStatus, self.targetStatus, self.referenceStatus, self.tripletStatus 

289 

290 def _update_database( 

291 self): 

292 """*update the database to show which images have been cached on the server* 

293 """ 

294 self.log.debug('starting the ``_update_database`` method') 

295 

296 if not len(self.tripletStatus): 

297 self.log.debug('completed the ``_update_database`` method') 

298 return None 

299 

300 # ITERATE OVER 4 STAMP COLUMNS 

301 for imageType, column in list(self.stampFlagColumns.items()): 

302 if column: 

303 if imageType == "subtracted": 

304 status = self.subtractedStatus 

305 if imageType == "target": 

306 status = self.targetStatus 

307 if imageType == "reference": 

308 status = self.referenceStatus 

309 if imageType == "triplet": 

310 status = self.tripletStatus 

311 

312 nonexist = [] 

313 exist = [] 

314 # NON-EXISTANT == STATUS 2 

315 nonexist[:] = [str(t) for t, s in zip( 

316 self.transientBucketIds, status) if s == 2] 

317 nonexist = (",").join(nonexist) 

318 # EXISTANT == STATUS 1 (i.e. these are downloaded) 

319 exist[:] = [str(t) for t, s in zip( 

320 self.transientBucketIds, status) if s == 1] 

321 exist = (",").join(exist) 

322 # GENERATE THE SQL TO UPDATE DATABASE 

323 sqlQuery = "" 

324 if len(nonexist): 

325 sqlQuery += """update pesstoObjects set %(column)s = 3 where transientBucketId in (%(nonexist)s) and %(column)s = 2;""" % locals( 

326 ) 

327 sqlQuery += """update pesstoObjects set %(column)s = 2 where transientBucketId in (%(nonexist)s) and (%(column)s is null or %(column)s = 0);""" % locals( 

328 ) 

329 

330 writequery( 

331 log=self.log, 

332 sqlQuery=sqlQuery, 

333 dbConn=self.dbConn 

334 ) 

335 if len(exist): 

336 sqlQuery = """update pesstoObjects set %(column)s = 1 where transientBucketId in (%(exist)s) and (%(column)s != 1 or %(column)s is null);""" % locals( 

337 ) 

338 

339 writequery( 

340 log=self.log, 

341 sqlQuery=sqlQuery, 

342 dbConn=self.dbConn 

343 ) 

344 

345 self.log.debug('completed the ``_update_database`` method') 

346 return None 

347 

348 # use the tab-trigger below for new method 

349 # xt-class-method 

350 

351 

352def download_image_array( 

353 imageArray, 

354 log, 

355 survey, 

356 downloadPath): 

357 """*download an array of transient image stamps* 

358 

359 **Key Arguments** 

360 

361 - ``log`` -- logger 

362 - ``imageArray`` -- [transientBucketId, subtractedUrl, targetUrl, referenceUrl, tripletUrl] 

363 - ``survey`` -- name of the survey to name stamps with 

364 - ``downloadPath`` -- directory to download the images into 

365 

366 

367 **Return** 

368 

369 - statusArray -- [subtractedStatus, targetStatus, referenceStatus, tripletStatus] 

370 

371 """ 

372 tid = imageArray[0] 

373 statusArray = [imageArray[0]] 

374 

375 # RECURSIVELY CREATE MISSING TRANSIENT DIRECTORIES 

376 downloadPath = "%(downloadPath)s/%(tid)s/" % locals() 

377 if not os.path.exists(downloadPath): 

378 os.makedirs(downloadPath) 

379 filepath = downloadPath + survey.lower() 

380 

381 for url, stamp in zip(imageArray[1:], ["subtracted", "target", "reference", "triplet"]): 

382 if url: 

383 pathToWriteFile = "%(filepath)s_%(stamp)s_stamp.jpeg" % locals( 

384 ) 

385 else: 

386 # NOTHING TO DOWNLOAD 

387 statusArray.append(0) 

388 continue 

389 

390 try: 

391 response = requests.get( 

392 url=url, 

393 timeout=1.0 

394 # params={}, 

395 # auth=HTTPBasicAuth('user', 'pwd') 

396 ) 

397 content = response.content 

398 status_code = response.status_code 

399 except requests.exceptions.RequestException as e: 

400 if 'timed out' in str(e): 

401 print('timed out - try again next time' % locals()) 

402 statusArray.append(0) 

403 else: 

404 print('HTTP Request failed - %(e)s' % locals()) 

405 print("") 

406 statusArray.append(2) 

407 continue 

408 

409 if status_code == 404: 

410 print('image not found' % locals()) 

411 statusArray.append(2) 

412 continue 

413 

414 # WRITE STAMP TO FILE 

415 try: 

416 writeFile = codecs.open( 

417 pathToWriteFile, mode='wb') 

418 except IOError as e: 

419 message = 'could not open the file %s' % (pathToWriteFile,) 

420 raise IOError(message) 

421 writeFile.write(content) 

422 writeFile.close() 

423 statusArray.append(1) 

424 

425 return statusArray