Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/local/bin/python 

2# encoding: utf-8 

3""" 

4*Given a list of dictionaries this function will insert each dictionary as a row into the given database table* 

5 

6:Author: 

7 David Young 

8""" 

9from __future__ import print_function 

10from __future__ import division 

11from builtins import str 

12from builtins import range 

13from past.utils import old_div 

14import sys 

15import os 

16os.environ['TERM'] = 'vt100' 

17from fundamentals import tools 

18from fundamentals.mysql import convert_dictionary_to_mysql_table, writequery 

19from fundamentals.fmultiprocess import fmultiprocess 

20import time 

21import re 

22from fundamentals.mysql.database import database 

23from datetime import datetime 

24 

25 

26count = 0 

27totalCount = 0 

28globalDbConn = False 

29sharedList = [] 

30 

31 

32def insert_list_of_dictionaries_into_database_tables( 

33 dbConn, 

34 log, 

35 dictList, 

36 dbTableName, 

37 uniqueKeyList=[], 

38 dateModified=False, 

39 dateCreated=True, 

40 batchSize=2500, 

41 replace=False, 

42 dbSettings=False): 

43 """insert list of dictionaries into database tables 

44 

45 **Key Arguments** 

46 

47 - ``dbConn`` -- mysql database connection 

48 - ``log`` -- logger 

49 - ``dictList`` -- list of python dictionaries to add to the database table 

50 - ``dbTableName`` -- name of the database table 

51 - ``uniqueKeyList`` -- a list of column names to append as a unique constraint on the database 

52 - ``dateModified`` -- add the modification date as a column in the database 

53 - ``dateCreated`` -- add the created date as a column in the database 

54 - ``batchSize`` -- batch the insert commands into *batchSize* batches 

55 - ``replace`` -- repalce row if a duplicate is found 

56 - ``dbSettings`` -- pass in the database settings so multiprocessing can establish one connection per process (might not be faster) 

57 

58 

59 **Return** 

60 

61 - None 

62 

63 

64 **Usage** 

65 

66 ```python 

67 from fundamentals.mysql import insert_list_of_dictionaries_into_database_tables 

68 insert_list_of_dictionaries_into_database_tables( 

69 dbConn=dbConn, 

70 log=log, 

71 dictList=dictList, 

72 dbTableName="test_insert_many", 

73 uniqueKeyList=["col1", "col3"], 

74 dateModified=False, 

75 batchSize=2500 

76 ) 

77 ``` 

78 

79 """ 

80 

81 log.debug( 

82 'starting the ````insert_list_of_dictionaries_into_database_tables`` function') 

83 

84 global count 

85 global totalCount 

86 global globalDbConn 

87 global sharedList 

88 

89 reDate = re.compile('^[0-9]{4}-[0-9]{2}-[0-9]{2}T') 

90 

91 if dbSettings: 

92 globalDbConn = dbSettings 

93 else: 

94 globalDbConn = dbConn 

95 

96 if len(dictList) == 0: 

97 log.warning( 

98 'the dictionary to be added to the database is empty' % locals()) 

99 return None 

100 

101 if len(dictList): 

102 # FIND BUG IN MYSQL QUERY BY UNCOMMENTING 

103 # tot = len(dictList) 

104 # for index, d in enumerate(dictList): 

105 # if index > 1: 

106 # # Cursor up one line and clear line 

107 # sys.stdout.write("\x1b[1A\x1b[2K") 

108 

109 # percent = (float(index) / float(tot)) * 100. 

110 # print('%(index)s/%(tot)s (%(percent)1.1f%% done)' % locals()) 

111 

112 # convert_dictionary_to_mysql_table( 

113 # dbConn=dbConn, 

114 # log=log, 

115 # dictionary=d, 

116 # dbTableName=dbTableName, 

117 # uniqueKeyList=uniqueKeyList, 

118 # dateModified=dateModified, 

119 # reDatetime=reDate, 

120 # replace=replace, 

121 # dateCreated=dateCreated) 

122 

123 convert_dictionary_to_mysql_table( 

124 dbConn=dbConn, 

125 log=log, 

126 dictionary=dictList[0], 

127 dbTableName=dbTableName, 

128 uniqueKeyList=uniqueKeyList, 

129 dateModified=dateModified, 

130 reDatetime=reDate, 

131 replace=replace, 

132 dateCreated=dateCreated) 

133 dictList = dictList[1:] 

134 

135 dbConn.autocommit(False) 

136 

137 if len(dictList): 

138 

139 total = len(dictList) 

140 batches = int(old_div(total, batchSize)) 

141 

142 start = 0 

143 end = 0 

144 sharedList = [] 

145 for i in range(batches + 1): 

146 end = end + batchSize 

147 start = i * batchSize 

148 thisBatch = dictList[start:end] 

149 sharedList.append((thisBatch, end)) 

150 

151 totalCount = total + 1 

152 ltotalCount = totalCount 

153 

154 print("Starting to insert %(ltotalCount)s rows into %(dbTableName)s" % locals()) 

155 

156 if dbSettings == False: 

157 fmultiprocess( 

158 log=log, 

159 function=_insert_single_batch_into_database, 

160 inputArray=list(range(len(sharedList))), 

161 dbTableName=dbTableName, 

162 uniqueKeyList=uniqueKeyList, 

163 dateModified=dateModified, 

164 replace=replace, 

165 batchSize=batchSize, 

166 reDatetime=reDate, 

167 dateCreated=dateCreated 

168 ) 

169 

170 else: 

171 fmultiprocess(log=log, function=_add_dictlist_to_database_via_load_in_file, 

172 inputArray=list(range(len(sharedList))), dbTablename=dbTableName, 

173 dbSettings=dbSettings, dateModified=dateModified) 

174 

175 sys.stdout.write("\x1b[1A\x1b[2K") 

176 print("%(ltotalCount)s / %(ltotalCount)s rows inserted into %(dbTableName)s" % locals()) 

177 

178 log.debug( 

179 'completed the ``insert_list_of_dictionaries_into_database_tables`` function') 

180 return None 

181 

182 

183def _insert_single_batch_into_database( 

184 batchIndex, 

185 log, 

186 dbTableName, 

187 uniqueKeyList, 

188 dateModified, 

189 replace, 

190 batchSize, 

191 reDatetime, 

192 dateCreated): 

193 """*summary of function* 

194 

195 **Key Arguments** 

196 

197 - ``batchIndex`` -- the index of the batch to insert 

198 - ``dbConn`` -- mysql database connection 

199 - ``log`` -- logger 

200 

201 

202 **Return** 

203 

204 - None 

205 

206 

207 **Usage** 

208 

209 .. todo:: 

210 

211 add usage info 

212 create a sublime snippet for usage 

213 

214 ```python 

215 usage code  

216 ``` 

217 

218 """ 

219 log.debug('starting the ``_insert_single_batch_into_database`` function') 

220 

221 global totalCount 

222 global globalDbConn 

223 global sharedList 

224 

225 batch = sharedList[batchIndex] 

226 

227 reDate = reDatetime 

228 

229 if isinstance(globalDbConn, dict): 

230 # SETUP ALL DATABASE CONNECTIONS 

231 

232 dbConn = database( 

233 log=log, 

234 dbSettings=globalDbConn, 

235 autocommit=False 

236 ).connect() 

237 else: 

238 dbConn = globalDbConn 

239 

240 count = batch[1] 

241 if count > totalCount: 

242 count = totalCount 

243 ltotalCount = totalCount 

244 

245 inserted = False 

246 while inserted == False: 

247 

248 if not replace: 

249 insertVerb = "INSERT IGNORE" 

250 else: 

251 insertVerb = "INSERT IGNORE" 

252 

253 uniKeys = set().union(*(list(d.keys()) for d in batch[0])) 

254 tmp = [] 

255 tmp[:] = [m.replace(" ", "_").replace( 

256 "-", "_") for m in uniKeys] 

257 uniKeys = tmp 

258 

259 myKeys = '`,`'.join(uniKeys) 

260 vals = [tuple([None if d[k] in ["None", None] else d[k] 

261 for k in uniKeys]) for d in batch[0]] 

262 valueString = ("%s, " * len(vals[0]))[:-2] 

263 insertCommand = insertVerb + """ INTO `""" + dbTableName + \ 

264 """` (`""" + myKeys + """`, dateCreated) VALUES (""" + \ 

265 valueString + """, NOW())""" 

266 

267 if not dateCreated: 

268 insertCommand = insertCommand.replace( 

269 ", dateCreated)", ")").replace(", NOW())", ")") 

270 

271 dup = "" 

272 if replace: 

273 dup = " ON DUPLICATE KEY UPDATE " 

274 for k in uniKeys: 

275 dup = """%(dup)s %(k)s=values(%(k)s),""" % locals() 

276 dup = """%(dup)s updated=1, dateLastModified=NOW()""" % locals() 

277 

278 insertCommand = insertCommand + dup 

279 

280 insertCommand = insertCommand.replace('\\""', '\\" "') 

281 insertCommand = insertCommand.replace('""', "null") 

282 insertCommand = insertCommand.replace('"None"', 'null') 

283 

284 message = "" 

285 # log.debug('adding new data to the %s table; query: %s' % 

286 # (dbTableName, addValue)) 

287 try: 

288 message = writequery( 

289 log=log, 

290 sqlQuery=insertCommand, 

291 dbConn=dbConn, 

292 Force=True, 

293 manyValueList=vals 

294 ) 

295 except: 

296 theseInserts = [] 

297 for aDict in batch[0]: 

298 insertCommand, valueTuple = convert_dictionary_to_mysql_table( 

299 dbConn=dbConn, 

300 log=log, 

301 dictionary=aDict, 

302 dbTableName=dbTableName, 

303 uniqueKeyList=uniqueKeyList, 

304 dateModified=dateModified, 

305 returnInsertOnly=True, 

306 replace=replace, 

307 reDatetime=reDate, 

308 skipChecks=True 

309 ) 

310 theseInserts.append(valueTuple) 

311 

312 message = "" 

313 # log.debug('adding new data to the %s table; query: %s' % 

314 # (dbTableName, addValue)) 

315 message = writequery( 

316 log=log, 

317 sqlQuery=insertCommand, 

318 dbConn=dbConn, 

319 Force=True, 

320 manyValueList=theseInserts 

321 ) 

322 

323 if message == "unknown column": 

324 for aDict in batch: 

325 convert_dictionary_to_mysql_table( 

326 dbConn=dbConn, 

327 log=log, 

328 dictionary=aDict, 

329 dbTableName=dbTableName, 

330 uniqueKeyList=uniqueKeyList, 

331 dateModified=dateModified, 

332 reDatetime=reDate, 

333 replace=replace 

334 ) 

335 else: 

336 inserted = True 

337 

338 dbConn.commit() 

339 

340 log.debug('completed the ``_insert_single_batch_into_database`` function') 

341 return "None" 

342 

343 

344def _add_dictlist_to_database_via_load_in_file( 

345 masterListIndex, 

346 dbTablename, 

347 dbSettings, 

348 dateModified=False): 

349 """*load a list of dictionaries into a database table with load data infile* 

350 

351 **Key Arguments** 

352 

353 - ``masterListIndex`` -- the index of the sharedList of dictionary lists to process 

354 - ``dbTablename`` -- the name of the database table to add the list to 

355 - ``dbSettings`` -- the dictionary of database settings 

356 - ``log`` -- logger 

357 - ``dateModified`` -- add a dateModified stamp with an updated flag to rows? 

358 

359 

360 **Return** 

361 

362 - None 

363 

364 

365 **Usage** 

366 

367 .. todo:: 

368 

369 add usage info 

370 create a sublime snippet for usage 

371 

372 ```python 

373 usage code 

374 ``` 

375 

376 """ 

377 from fundamentals.logs import emptyLogger 

378 import pandas as pd 

379 import numpy as np 

380 log = emptyLogger() 

381 log.debug('starting the ``_add_dictlist_to_database_via_load_in_file`` function') 

382 

383 global sharedList 

384 

385 dictList = sharedList[masterListIndex][0] 

386 

387 count = sharedList[masterListIndex][1] 

388 if count > totalCount: 

389 count = totalCount 

390 ltotalCount = totalCount 

391 

392 # SETUP ALL DATABASE CONNECTIONS 

393 dbConn = database( 

394 log=log, 

395 dbSettings=dbSettings 

396 ).connect() 

397 

398 now = datetime.now() 

399 tmpTable = now.strftime("tmp_%Y%m%dt%H%M%S%f") 

400 

401 # CREATE A TEMPORY TABLE TO ADD DATA TO 

402 sqlQuery = """CREATE TEMPORARY TABLE %(tmpTable)s SELECT * FROM %(dbTablename)s WHERE 1=0;""" % locals() 

403 writequery( 

404 log=log, 

405 sqlQuery=sqlQuery, 

406 dbConn=dbConn 

407 ) 

408 

409 csvColumns = [k for d in dictList for k in list(d.keys())] 

410 csvColumns = list(set(csvColumns)) 

411 csvColumnsString = (', ').join(csvColumns) 

412 csvColumnsString = csvColumnsString.replace(u" dec,", u" decl,") 

413 

414 df = pd.DataFrame(dictList) 

415 df.replace(['nan', 'None', '', 'NaN', np.nan], '\\N', inplace=True) 

416 df.to_csv('/tmp/%(tmpTable)s' % locals(), sep="|", 

417 index=False, escapechar="\\", quotechar='"', columns=csvColumns, encoding='utf-8') 

418 

419 sqlQuery = """LOAD DATA LOCAL INFILE '/tmp/%(tmpTable)s' 

420INTO TABLE %(tmpTable)s 

421FIELDS TERMINATED BY '|' OPTIONALLY ENCLOSED BY '"' 

422IGNORE 1 LINES 

423(%(csvColumnsString)s);""" % locals() 

424 

425 writequery( 

426 log=log, 

427 sqlQuery=sqlQuery, 

428 dbConn=dbConn 

429 ) 

430 

431 updateStatement = "" 

432 for i in csvColumns: 

433 updateStatement += "`%(i)s` = VALUES(`%(i)s`), " % locals() 

434 if dateModified: 

435 updateStatement += "dateLastModified = NOW(), updated = 1" 

436 else: 

437 updateStatement = updateStatement[0:-2] 

438 

439 sqlQuery = """ 

440INSERT IGNORE INTO %(dbTablename)s 

441SELECT * FROM %(tmpTable)s 

442ON DUPLICATE KEY UPDATE %(updateStatement)s;""" % locals() 

443 writequery( 

444 log=log, 

445 sqlQuery=sqlQuery, 

446 dbConn=dbConn 

447 ) 

448 

449 sqlQuery = """DROP TEMPORARY TABLE %(tmpTable)s;""" % locals() 

450 writequery( 

451 log=log, 

452 sqlQuery=sqlQuery, 

453 dbConn=dbConn 

454 ) 

455 

456 try: 

457 os.remove('/tmp/%(tmpTable)s' % locals()) 

458 except: 

459 pass 

460 

461 dbConn.close() 

462 

463 log.debug( 

464 'completed the ``_add_dictlist_to_database_via_load_in_file`` function') 

465 return None 

466 

467# use the tab-trigger below for new function 

468# xt-def-function