Coverage for marshallEngine/feeders/images.py : 0%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/local/bin/python
2# encoding: utf-8
3"""
4*cache the panstarrs image stamps*
6:Author:
7 David Young
8"""
9from __future__ import print_function
10from __future__ import division
11from builtins import str
12from builtins import zip
13from builtins import object
14from past.utils import old_div
15import sys
16import os
17os.environ['TERM'] = 'vt100'
18from fundamentals import tools
19from fundamentals.mysql import readquery
20import requests
21from requests.auth import HTTPBasicAuth
22import codecs
23from fundamentals import fmultiprocess
24from fundamentals.mysql import writequery
27class images(object):
28 """
29 *The base class for the feeder image cachers*
31 **Usage**
33 To create a new survey image cacher create a new class using this class as the baseclass:
35 ```python
36 from ..images import images as baseimages
37 class images(baseimages):
38 ....
39 ```
41 """
43 def cache(
44 self,
45 limit=1000):
46 """*cache the image for the requested survey*
48 **Key Arguments**
50 - ``limit`` -- limit the number of transients in the list so not to piss-off survey owners by downloading everything in one go.
53 **Usage**
55 ```python
56 from marshallEngine.feeders.panstarrs import images
57 cacher = images(
58 log=log,
59 settings=settings,
60 dbConn=dbConn
61 ).cache(limit=1000)
62 ```
64 """
65 self.log.debug('starting the ``cache`` method')
67 # THESE SURVEY DON'T HAVE IMAGES - PASS
68 if self.survey in ["tns", "atel", "atels"]:
69 return
71 transientBucketIds, subtractedUrls, targetUrls, referenceUrls, tripletUrls = self._list_images_needing_cached()
72 leng = len(transientBucketIds)
73 survey = self.survey
75 if not leng:
76 print("All _new_ images are cached for the %(survey)s survey" % locals())
78 else:
80 if leng > limit:
81 print(
82 "Downloading image stamps for the next %(limit)s transients of %(leng)s remaining for %(survey)s" % locals())
83 else:
84 print(
85 "Downloading image stamps for the remaining %(leng)s transients for %(survey)s" % locals())
86 subtractedStatus, targetStatus, referenceStatus, tripletStatus = self._download(
87 transientBucketIds=transientBucketIds[:limit],
88 subtractedUrls=subtractedUrls[:limit],
89 targetUrls=targetUrls[:limit],
90 referenceUrls=referenceUrls[:limit],
91 tripletUrls=tripletUrls[:limit]
92 )
93 self._update_database()
95 transientBucketIds, subtractedUrls, targetUrls, referenceUrls, tripletUrls = self._list_images_needing_cached(
96 failedImage=True)
97 leng = len(transientBucketIds)
99 if not leng:
100 print("All images are cached for the %(survey)s survey" % locals())
102 else:
103 print("Downloading image stamps for the next %(limit)s transients of %(leng)s remaining for %(survey)s - previously failed" % locals())
104 subtractedStatus, targetStatus, referenceStatus, tripletStatus = self._download(
105 transientBucketIds=transientBucketIds[:limit],
106 subtractedUrls=subtractedUrls[:limit],
107 targetUrls=targetUrls[:limit],
108 referenceUrls=referenceUrls[:limit],
109 tripletUrls=tripletUrls[:limit]
110 )
111 self._update_database()
113 self.log.debug('completed the ``cache`` method')
114 return None
116 def _list_images_needing_cached(
117 self,
118 failedImage=False):
119 """*get lists of the transientBucketIds and images needing cached for those transients*
121 **Key Arguments**
123 - ``failedImage`` -- second pass attempt to download alternative image for transients
126 **Return**
128 - ``transientBucketIds, subtractedUrls, targetUrls, referenceUrls, tripletUrls`` -- synced lists of transientBucketIds, subtracted-, target-, reference- and triplet-image urls. All lists are the same size.
130 """
131 self.log.debug('starting the ``_list_images_needing_cached`` method')
133 subtractedUrls, targetUrls, referenceUrls, tripletUrls = [], [], [], []
134 for imageType, v in list(self.stampFlagColumns.items()):
135 if not v:
136 continue
137 imageUrl = imageType + "ImageUrl"
138 # CREATE THE STAMP WHERE CLAUSE
139 if not failedImage:
140 stampWhere = v + " IS NULL "
141 else:
142 stampWhere = v + " = 2 "
144 # CREATE THE SURVEY WHERE CLAUSE
145 dbSurveyNames = "t.survey LIKE '%%" + \
146 ("%%' OR t.survey LIKE '%%").join(self.dbSurveyNames) + "%%'"
148 # NOW GENERATE SQL TO GET THE URLS OF STAMPS NEEDING DOWNLOADED
149 if self.survey == "useradded":
150 sqlQuery = u"""
151 SELECT
152 a.transientBucketId, a.%(imageUrl)s
153 FROM
154 transientBucket a
155 JOIN
156 (SELECT
157 MIN(magnitude) AS mag, transientBucketId
158 FROM
159 transientBucket
160 WHERE
161 magnitude IS NOT NULL
162 AND %(imageUrl)s IS NOT NULL
163 AND transientBucketId in (select transientBucketId from fs_user_added)
164 AND transientBucketId IN (SELECT
165 transientBucketId
166 FROM
167 pesstoObjects
168 WHERE
169 %(stampWhere)s and limitingMag = 0)
170 GROUP BY transientBucketId
171 ORDER BY transientBucketId) AS b ON a.transientBucketId = b.transientBucketId
172 AND a.magnitude = b.mag
173 WHERE limitingMag = 0
174 GROUP BY transientBucketId;
175 """ % locals()
176 else:
177 sqlQuery = u"""
178 SELECT
179 MIN(magnitude) AS mag, t.transientBucketId, %(imageUrl)s
180FROM
181 transientBucket t,
182 pesstoObjects p
183WHERE
184 t.magnitude IS NOT NULL
185 AND t.%(imageUrl)s IS NOT NULL
186 AND p.%(stampWhere)s
187 AND t.transientbucketId = p.transientbucketId
188 AND (%(dbSurveyNames)s)
189 AND t.limitingMag = 0 group by t.transientBucketId;""" % locals()
191 if failedImage:
192 sqlQuery = sqlQuery.replace("AND a.magnitude = b.mag", "").replace(
193 "GROUP BY a.transientBucketId;", "")
195 rows = readquery(
196 log=self.log,
197 sqlQuery=sqlQuery,
198 dbConn=self.dbConn,
199 )
201 # SPLIT URLS INTO STAMP TYPES AND ORDER ALONGSIDE
202 # TRANSIENTBUKCETIDs
203 transientBucketIds = []
204 for row in rows:
205 transientBucketIds.append(row["transientBucketId"])
206 if imageType == "subtracted":
207 subtractedUrls.append(row["subtractedImageUrl"])
208 if imageType == "target":
209 targetUrls.append(row["targetImageUrl"])
210 if imageType == "reference":
211 referenceUrls.append(row["referenceImageUrl"])
212 if imageType == "triplet":
213 tripletUrls.append(row["tripletImageUrl"])
215 for imageType, v in list(self.stampFlagColumns.items()):
216 if not v:
217 if imageType == "subtracted":
218 subtractedUrls = [None] * len(transientBucketIds)
219 if imageType == "target":
220 targetUrls = [None] * len(transientBucketIds)
221 if imageType == "reference":
222 referenceUrls = [None] * len(transientBucketIds)
223 if imageType == "triplet":
224 tripletUrls = [None] * len(transientBucketIds)
226 self.log.debug('completed the ``_list_images_needing_cached`` method')
227 self.transientBucketIds = transientBucketIds
228 return transientBucketIds, subtractedUrls, targetUrls, referenceUrls, tripletUrls
230 def _download(
231 self,
232 transientBucketIds,
233 subtractedUrls,
234 targetUrls,
235 referenceUrls,
236 tripletUrls):
237 """*cache the images for the survey under their transientBucketId folders in the web-server cache*
239 **Key Arguments**
241 - ``transientBucketIds`` -- the list of transientBucketId for the transients needing images downloaded.
242 - ``subtractedUrls`` -- the list of subtracted image urls (same length as transientBucketIds list).
243 - ``targetUrls`` -- the list of target image urls (same length as transientBucketIds list).
244 - ``referenceUrls`` -- the list of reference image urls (same length as transientBucketIds list).
245 - ``tripletUrls`` -- the list of triplet image urls (same length as transientBucketIds list).
248 **Return**
250 - ``subtractedStatus`` -- status of the subtracted image download (0 = fail, 1 = success, 2 = does not exist)
251 - ``targetStatus`` -- status of the target image download (0 = fail, 1 = success, 2 = does not exist)
252 - ``referenceStatus`` -- status of the reference image download (0 = fail, 1 = success, 2 = does not exist)
253 - ``tripletStatus`` -- status of the triplet image download (0 = fail, 1 = success, 2 = does not exist)
255 """
256 self.log.debug('starting the ``_download`` method')
258 downloadDirectoryPath = self.downloadDirectoryPath
259 self.subtractedStatus = []
260 self.targetStatus = []
261 self.referenceStatus = []
262 self.tripletStatus = []
263 index = 1
264 survey = self.survey.lower()
266 # TOTAL TO DOWNLOAD
267 count = len(transientBucketIds)
269 # DOWNLOAD THE IMAGE SETS FOR EACH TRANSIENT AND ADD STATUS TO OUTPUT
270 # ARRAYS. (0 = fail, 1 = success, 2 = does not exist)
271 for tid, surl, turl, rurl, purl in zip(transientBucketIds, subtractedUrls, targetUrls, referenceUrls, tripletUrls):
272 if index > 1:
273 # Cursor up one line and clear line
274 sys.stdout.write("\x1b[1A\x1b[2K")
276 percent = (old_div(float(index), float(count))) * 100.
277 print('%(index)s/%(count)s (%(percent)1.1f%% done): downloading %(survey)s stamps for transientBucketId: %(tid)s' % locals())
279 statusArray = download_image_array(imageArray=[
280 tid, surl, turl, rurl, purl], log=self.log, survey=self.survey, downloadPath=downloadDirectoryPath)
281 self.subtractedStatus.append(statusArray[1])
282 self.targetStatus.append(statusArray[2])
283 self.referenceStatus.append(statusArray[3])
284 self.tripletStatus.append(statusArray[4])
285 index += 1
287 self.log.debug('completed the ``_download`` method')
288 return self.subtractedStatus, self.targetStatus, self.referenceStatus, self.tripletStatus
290 def _update_database(
291 self):
292 """*update the database to show which images have been cached on the server*
293 """
294 self.log.debug('starting the ``_update_database`` method')
296 if not len(self.tripletStatus):
297 self.log.debug('completed the ``_update_database`` method')
298 return None
300 # ITERATE OVER 4 STAMP COLUMNS
301 for imageType, column in list(self.stampFlagColumns.items()):
302 if column:
303 if imageType == "subtracted":
304 status = self.subtractedStatus
305 if imageType == "target":
306 status = self.targetStatus
307 if imageType == "reference":
308 status = self.referenceStatus
309 if imageType == "triplet":
310 status = self.tripletStatus
312 nonexist = []
313 exist = []
314 # NON-EXISTANT == STATUS 2
315 nonexist[:] = [str(t) for t, s in zip(
316 self.transientBucketIds, status) if s == 2]
317 nonexist = (",").join(nonexist)
318 # EXISTANT == STATUS 1 (i.e. these are downloaded)
319 exist[:] = [str(t) for t, s in zip(
320 self.transientBucketIds, status) if s == 1]
321 exist = (",").join(exist)
322 # GENERATE THE SQL TO UPDATE DATABASE
323 sqlQuery = ""
324 if len(nonexist):
325 sqlQuery += """update pesstoObjects set %(column)s = 3 where transientBucketId in (%(nonexist)s) and %(column)s = 2;""" % locals(
326 )
327 sqlQuery += """update pesstoObjects set %(column)s = 2 where transientBucketId in (%(nonexist)s) and (%(column)s is null or %(column)s = 0);""" % locals(
328 )
330 writequery(
331 log=self.log,
332 sqlQuery=sqlQuery,
333 dbConn=self.dbConn
334 )
335 if len(exist):
336 sqlQuery = """update pesstoObjects set %(column)s = 1 where transientBucketId in (%(exist)s) and (%(column)s != 1 or %(column)s is null);""" % locals(
337 )
339 writequery(
340 log=self.log,
341 sqlQuery=sqlQuery,
342 dbConn=self.dbConn
343 )
345 self.log.debug('completed the ``_update_database`` method')
346 return None
348 # use the tab-trigger below for new method
349 # xt-class-method
352def download_image_array(
353 imageArray,
354 log,
355 survey,
356 downloadPath):
357 """*download an array of transient image stamps*
359 **Key Arguments**
361 - ``log`` -- logger
362 - ``imageArray`` -- [transientBucketId, subtractedUrl, targetUrl, referenceUrl, tripletUrl]
363 - ``survey`` -- name of the survey to name stamps with
364 - ``downloadPath`` -- directory to download the images into
367 **Return**
369 - statusArray -- [subtractedStatus, targetStatus, referenceStatus, tripletStatus]
371 """
372 tid = imageArray[0]
373 statusArray = [imageArray[0]]
375 # RECURSIVELY CREATE MISSING TRANSIENT DIRECTORIES
376 downloadPath = "%(downloadPath)s/%(tid)s/" % locals()
377 if not os.path.exists(downloadPath):
378 os.makedirs(downloadPath)
379 filepath = downloadPath + survey.lower()
381 for url, stamp in zip(imageArray[1:], ["subtracted", "target", "reference", "triplet"]):
382 if url:
383 pathToWriteFile = "%(filepath)s_%(stamp)s_stamp.jpeg" % locals(
384 )
385 else:
386 # NOTHING TO DOWNLOAD
387 statusArray.append(0)
388 continue
390 try:
391 response = requests.get(
392 url=url,
393 timeout=1.0
394 # params={},
395 # auth=HTTPBasicAuth('user', 'pwd')
396 )
397 content = response.content
398 status_code = response.status_code
399 except requests.exceptions.RequestException as e:
400 if 'timed out' in str(e):
401 print('timed out - try again next time' % locals())
402 statusArray.append(0)
403 else:
404 print('HTTP Request failed - %(e)s' % locals())
405 print("")
406 statusArray.append(2)
407 continue
409 if status_code == 404:
410 print('image not found' % locals())
411 statusArray.append(2)
412 continue
414 # WRITE STAMP TO FILE
415 try:
416 writeFile = codecs.open(
417 pathToWriteFile, mode='wb')
418 except IOError as e:
419 message = 'could not open the file %s' % (pathToWriteFile,)
420 raise IOError(message)
421 writeFile.write(content)
422 writeFile.close()
423 statusArray.append(1)
425 return statusArray