Coverage for neddy/_basesearch.py: 84%

202 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-09-20 10:57 +0000

1#!/usr/local/bin/python 

2# encoding: utf-8 

3""" 

4*The base class for NED searches* 

5 

6:Author: 

7 David Young 

8 

9:Date Created: 

10 May 6, 2015 

11""" 

12from __future__ import print_function 

13from __future__ import division 

14import sys 

15import os 

16os.environ['TERM'] = 'vt100' 

17 

18 

19class _basesearch(object): 

20 

21 """ 

22 The base-class for searching NED 

23 """ 

24 

25 def __init__( 

26 self 

27 ): 

28 return None 

29 

30 def _convert_coordinates_to_decimal_degrees( 

31 self): 

32 """ 

33 *convert coordinates to decimal degrees* 

34 """ 

35 self.log.debug( 

36 'completed the ````_convert_coordinates_to_decimal_degrees`` method') 

37 

38 from astrocalc.coords import unit_conversion 

39 converter = unit_conversion( 

40 log=self.log 

41 ) 

42 

43 # CONVERT ALL COORDINATES TO DECIMAL DEGREES 

44 if len(self.listOfCoordinates) and isinstance(self.listOfCoordinates[0], ("".__class__, u"".__class__)): 

45 sources = [] 

46 sources[:] = [s.split(" ") for s in self.listOfCoordinates] 

47 

48 else: 

49 sources = self.listOfCoordinates 

50 self.listOfCoordinates = [] 

51 

52 # TEST IF CONVERSION IS REQUIRED 

53 try: 

54 ra = float(sources[0][0]) 

55 convert = False 

56 except: 

57 convert = True 

58 

59 if convert == True: 

60 raDegs = [] 

61 raDegs[:] = [converter.ra_sexegesimal_to_decimal( 

62 ra=s[0]) for s in sources] 

63 decDegs = [] 

64 decDegs[:] = [converter.dec_sexegesimal_to_decimal( 

65 dec=s[1]) for s in sources] 

66 self.listOfCoordinates = [] 

67 self.listOfCoordinates[:] = [[raDeg, decDeg] 

68 for raDeg, decDeg in zip(raDegs, decDegs)] 

69 

70 else: 

71 self.listOfCoordinates = [] 

72 self.listOfCoordinates[:] = [[float(s[0]), float(s[1])] 

73 for s in sources] 

74 

75 self.log.debug( 

76 'completed the ``_convert_coordinates_to_decimal_degrees`` method') 

77 return None 

78 

79 def _parse_the_ned_position_results( 

80 self, 

81 ra, 

82 dec, 

83 nedResults): 

84 """ 

85 *parse the results of a NED conesearch and return as python dicts* 

86 

87 **Key Arguments:** 

88 - ``ra`` -- the search ra 

89 - ``dec`` -- the search dec 

90 

91 **Return:** 

92 - ``results`` -- list of result dictionaries 

93 - ``resultLen`` -- the number of matches returned 

94 """ 

95 self.log.debug('starting the ``_parse_the_ned_results`` method') 

96 import csv 

97 import string 

98 import re 

99 import codecs 

100 results = [] 

101 resultLen = 0 

102 if nedResults: 

103 # OPEN THE RESULT FILE FROM NED 

104 pathToReadFile = nedResults 

105 try: 

106 self.log.debug("attempting to open the file %s" % 

107 (pathToReadFile,)) 

108 readFile = codecs.open( 

109 pathToReadFile, encoding='utf-8', mode='rb') 

110 thisData = readFile.read() 

111 readFile.close() 

112 except IOError as e: 

113 message = 'could not open the file %s' % (pathToReadFile,) 

114 self.log.critical(message) 

115 raise IOError(message) 

116 readFile.close() 

117 

118 # CHECK FOR ERRORS 

119 if "Results from query to NASA/IPAC Extragalactic Database" not in thisData: 

120 print("something went wrong with the NED query") 

121 self.log.error( 

122 "something went wrong with the NED query" % locals()) 

123 sys.exit(0) 

124 

125 # SEARCH FROM MATCHES IN RESULTS FILE 

126 matchObject = re.search( 

127 r"No\.\|Object Name.*?\n(.*)", thisData, re.S) 

128 if matchObject: 

129 try: 

130 theseLines = str.split(matchObject.group(), '\n') 

131 except: 

132 theseLines = string.split(matchObject.group(), '\n') 

133 resultLen = len(theseLines) 

134 csvReader = csv.DictReader( 

135 theseLines, dialect='excel', delimiter='|', quotechar='"') 

136 for row in csvReader: 

137 thisEntry = {"searchRa": ra, "searchDec": dec, 

138 "matchName": row["Object Name"].strip()} 

139 results.append(thisEntry) 

140 if self.nearestOnly: 

141 break 

142 

143 self.log.debug('completed the ``_parse_the_ned_results`` method') 

144 return results, resultLen 

145 

146 def _convert_html_to_csv( 

147 self): 

148 """ 

149 *convert NED's html output to csv format* 

150 """ 

151 self.log.debug('starting the ``_convert_html_to_csv`` method') 

152 

153 import codecs 

154 import re 

155 allData = "" 

156 regex1 = re.compile( 

157 r'.*<PRE><strong> (.*?)</strong>(.*?)</PRE></TABLE>.*', re.I | re.S) 

158 regex2 = re.compile(r'\|(\w)\|', re.I | re.S) 

159 for thisFile in self.nedResults: 

160 pathToReadFile = thisFile 

161 try: 

162 self.log.debug("attempting to open the file %s" % 

163 (pathToReadFile,)) 

164 readFile = codecs.open( 

165 pathToReadFile, encoding='utf-8', mode='r') 

166 thisData = readFile.read() 

167 readFile.close() 

168 except IOError as e: 

169 message = 'could not open the file %s' % (pathToReadFile,) 

170 self.log.critical(message) 

171 raise IOError(message) 

172 except: 

173 if pathToReadFile == None: 

174 message = 'we have no file to open' 

175 self.log.error(message) 

176 continue 

177 readFile.close() 

178 

179 self.log.debug("regex 1 - sub") 

180 thisData = regex1.sub("\g<1>\g<2>", thisData) 

181 self.log.debug("regex 2 - sub") 

182 thisData = regex2.sub("abs(\g<1>)", thisData) 

183 self.log.debug("replace text") 

184 thisData = thisData.replace("|b|", "abs(b)") 

185 

186 writeFile = codecs.open(pathToReadFile, encoding='utf-8', mode='w') 

187 writeFile.write(thisData) 

188 writeFile.close() 

189 

190 self.log.debug('completed the ``_convert_html_to_csv`` method') 

191 return None 

192 

193 def _parse_the_ned_list_results( 

194 self): 

195 """ 

196 *parse the NED results* 

197 

198 **Return:** 

199 - ``results`` --  

200 - ``headers`` -- description. Default **. [opt1|opt2] 

201 - 

202 

203 .. todo:: 

204 

205 - @review: when complete, clean _parse_the_ned_results method 

206 - @review: when complete add logging 

207 """ 

208 self.log.debug('starting the ``_parse_the_ned_list_results`` method') 

209 import csv 

210 import string 

211 import re 

212 import codecs 

213 self.resultSpacing = 30 

214 

215 results = [] 

216 

217 # CHOOSE VALUES TO RETURN 

218 allHeaders = ["searchIndex", "searchRa", "searchDec", "row_number", "input_note", "input_name", "ned_notes", "ned_name", "ra", "dec", "eb-v", "object_type", "redshift", "redshift_err", "redshift_quality", "magnitude_filter", 

219 "major_diameter_arcmin", "minor_diameter_arcmin", "morphology", "hierarchy", "galaxy_morphology", "radio_morphology", "activity_type", "distance_indicator", "distance_mod", "distance"] 

220 if self.verbose == True: 

221 headers = ["searchIndex", "searchRa", "searchDec", "row_number", "input_note", "input_name", "ned_notes", "ned_name", "ra", "dec", "eb-v", "object_type", "redshift", "redshift_err", "redshift_quality", "magnitude_filter", 

222 "major_diameter_arcmin", "minor_diameter_arcmin", "morphology", "hierarchy", "galaxy_morphology", "radio_morphology", "activity_type", "distance_indicator", "distance_mod", "distance"] 

223 else: 

224 headers = [ 

225 "searchIndex", "searchRa", "searchDec", "ned_name", "ra", "dec", "object_type", "redshift"] 

226 

227 if self.theseBatchParams == False: 

228 allHeaders = allHeaders[3:] 

229 headers = headers[3:] 

230 

231 for thisFile in self.nedResults: 

232 if thisFile: 

233 pathToReadFile = thisFile 

234 # FIND THE BATCH INDEX NUMBER 

235 thisIndex = int(thisFile.split("/")[-1].split("_")[0]) 

236 try: 

237 self.log.debug("attempting to open the file %s" % 

238 (pathToReadFile,)) 

239 readFile = codecs.open( 

240 pathToReadFile, encoding='utf-8', mode='rb') 

241 thisData = readFile.read() 

242 readFile.close() 

243 except IOError as e: 

244 message = 'could not open the file %s' % (pathToReadFile,) 

245 self.log.critical(message) 

246 raise IOError(message) 

247 readFile.close() 

248 

249 # GRAB THE ROWS OF DATA 

250 matchObject = re.search( 

251 r"\n1\s*?\|\s*?.*", thisData, re.S) 

252 thisRow = "" 

253 if matchObject: 

254 thisHeader = "" 

255 for head in allHeaders: 

256 thisHeader += str(head).ljust(self.resultSpacing, 

257 ' ') + " | " 

258 try: 

259 theseLines = str.split(matchObject.group(), '\n')[1:] 

260 except: 

261 theseLines = string.split( 

262 matchObject.group(), '\n')[1:] 

263 

264 if self.theseBatchParams: 

265 newLines = [] 

266 for t, b in zip(theseLines, self.theseBatchParams[thisIndex]): 

267 t = "%s | %s | %s | %s " % ( 

268 b["searchIndex"], b["searchRa"], b["searchDec"], t) 

269 newLines.append(t) 

270 theseLines = newLines 

271 

272 theseLines = [thisHeader] + theseLines 

273 csvReader = csv.DictReader( 

274 theseLines, dialect='excel', delimiter='|', quotechar='"') 

275 for row in csvReader: 

276 thisDict = {} 

277 row = dict(row) 

278 if not list(row.keys()): 

279 continue 

280 if None in list(row.keys()): 

281 continue 

282 if "ned_name" not in ("").join(list(row.keys())).lower(): 

283 continue 

284 for k, v in list(row.items()): 

285 try: 

286 # self.log.debug("attempting to strip ned key") 

287 k = k.strip() 

288 except Exception as e: 

289 self.log.error( 

290 'cound not strip ned key (%(k)s, %(v)s)' % locals()) 

291 self.log.error( 

292 "could not strip ned key - failed with this error: %s " % (str(e),)) 

293 break 

294 if (k == "ra" or k == "dec"): 

295 v = v.replace("h", ":").replace( 

296 "m", ":").replace("d", ":").replace("s", "") 

297 if isinstance(v, ("".__class__, u"".__class__)): 

298 v = v.strip() 

299 thisDict[k] = v 

300 results.append(thisDict) 

301 

302 os.remove(thisFile) 

303 

304 self.log.debug('completed the ``_parse_the_ned_list_results`` method') 

305 return results, headers 

306 

307 def _split_incoming_queries_into_batches( 

308 self, 

309 sources, 

310 searchParams=False): 

311 """ 

312 *split incoming queries into batches* 

313 

314 **Key Arguments:** 

315 - ``sources`` -- sources to split into batches 

316 - ``searchParams`` -- search params associated with batches 

317 

318 **Return:** 

319 - ``theseBatches`` -- list of batches 

320 - ``theseBatchParams`` -- params associated with batches 

321 """ 

322 self.log.debug( 

323 'completed the ````_split_incoming_queries_into_batches`` method') 

324 

325 from past.utils import old_div 

326 

327 batchSize = 180 

328 total = len(sources) 

329 batches = int(old_div(total, batchSize)) + 1 

330 

331 start = 0 

332 end = 0 

333 theseBatches = [] 

334 theseBatchParams = [] 

335 for i in range(batches): 

336 end = end + batchSize 

337 start = i * batchSize 

338 thisBatch = sources[start:end] 

339 theseBatches.append(thisBatch) 

340 

341 if searchParams != False: 

342 thisBatch = searchParams[start:end] 

343 theseBatchParams.append(thisBatch) 

344 

345 if len(theseBatchParams) == 0: 

346 theseBatchParams = False 

347 

348 self.log.debug( 

349 'completed the ``_split_incoming_queries_into_batches`` method') 

350 return theseBatches, theseBatchParams 

351 

352 # use the tab-trigger below for new method 

353 # xt-class-method