Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/local/bin/python
2# encoding: utf-8
3"""
4*Given a list of dictionaries this function will insert each dictionary as a row into the given database table*
6:Author:
7 David Young
8"""
9from __future__ import print_function
10from __future__ import division
11from builtins import str
12from builtins import range
13from past.utils import old_div
14import sys
15import os
16os.environ['TERM'] = 'vt100'
17from fundamentals import tools
18from fundamentals.mysql import convert_dictionary_to_mysql_table, writequery
19from fundamentals.fmultiprocess import fmultiprocess
20import time
21import re
22from fundamentals.mysql.database import database
23from datetime import datetime
26count = 0
27totalCount = 0
28globalDbConn = False
29sharedList = []
32def insert_list_of_dictionaries_into_database_tables(
33 dbConn,
34 log,
35 dictList,
36 dbTableName,
37 uniqueKeyList=[],
38 dateModified=False,
39 dateCreated=True,
40 batchSize=2500,
41 replace=False,
42 dbSettings=False):
43 """insert list of dictionaries into database tables
45 **Key Arguments**
47 - ``dbConn`` -- mysql database connection
48 - ``log`` -- logger
49 - ``dictList`` -- list of python dictionaries to add to the database table
50 - ``dbTableName`` -- name of the database table
51 - ``uniqueKeyList`` -- a list of column names to append as a unique constraint on the database
52 - ``dateModified`` -- add the modification date as a column in the database
53 - ``dateCreated`` -- add the created date as a column in the database
54 - ``batchSize`` -- batch the insert commands into *batchSize* batches
55 - ``replace`` -- repalce row if a duplicate is found
56 - ``dbSettings`` -- pass in the database settings so multiprocessing can establish one connection per process (might not be faster)
59 **Return**
61 - None
64 **Usage**
66 ```python
67 from fundamentals.mysql import insert_list_of_dictionaries_into_database_tables
68 insert_list_of_dictionaries_into_database_tables(
69 dbConn=dbConn,
70 log=log,
71 dictList=dictList,
72 dbTableName="test_insert_many",
73 uniqueKeyList=["col1", "col3"],
74 dateModified=False,
75 batchSize=2500
76 )
77 ```
79 """
81 log.debug(
82 'starting the ````insert_list_of_dictionaries_into_database_tables`` function')
84 global count
85 global totalCount
86 global globalDbConn
87 global sharedList
89 reDate = re.compile('^[0-9]{4}-[0-9]{2}-[0-9]{2}T')
91 if dbSettings:
92 globalDbConn = dbSettings
93 else:
94 globalDbConn = dbConn
96 if len(dictList) == 0:
97 log.warning(
98 'the dictionary to be added to the database is empty' % locals())
99 return None
101 if len(dictList):
102 # FIND BUG IN MYSQL QUERY BY UNCOMMENTING
103 # tot = len(dictList)
104 # for index, d in enumerate(dictList):
105 # if index > 1:
106 # # Cursor up one line and clear line
107 # sys.stdout.write("\x1b[1A\x1b[2K")
109 # percent = (float(index) / float(tot)) * 100.
110 # print('%(index)s/%(tot)s (%(percent)1.1f%% done)' % locals())
112 # convert_dictionary_to_mysql_table(
113 # dbConn=dbConn,
114 # log=log,
115 # dictionary=d,
116 # dbTableName=dbTableName,
117 # uniqueKeyList=uniqueKeyList,
118 # dateModified=dateModified,
119 # reDatetime=reDate,
120 # replace=replace,
121 # dateCreated=dateCreated)
123 convert_dictionary_to_mysql_table(
124 dbConn=dbConn,
125 log=log,
126 dictionary=dictList[0],
127 dbTableName=dbTableName,
128 uniqueKeyList=uniqueKeyList,
129 dateModified=dateModified,
130 reDatetime=reDate,
131 replace=replace,
132 dateCreated=dateCreated)
133 dictList = dictList[1:]
135 dbConn.autocommit(False)
137 if len(dictList):
139 total = len(dictList)
140 batches = int(old_div(total, batchSize))
142 start = 0
143 end = 0
144 sharedList = []
145 for i in range(batches + 1):
146 end = end + batchSize
147 start = i * batchSize
148 thisBatch = dictList[start:end]
149 sharedList.append((thisBatch, end))
151 totalCount = total + 1
152 ltotalCount = totalCount
154 print("Starting to insert %(ltotalCount)s rows into %(dbTableName)s" % locals())
156 if dbSettings == False:
157 fmultiprocess(
158 log=log,
159 function=_insert_single_batch_into_database,
160 inputArray=list(range(len(sharedList))),
161 dbTableName=dbTableName,
162 uniqueKeyList=uniqueKeyList,
163 dateModified=dateModified,
164 replace=replace,
165 batchSize=batchSize,
166 reDatetime=reDate,
167 dateCreated=dateCreated
168 )
170 else:
171 fmultiprocess(log=log, function=_add_dictlist_to_database_via_load_in_file,
172 inputArray=list(range(len(sharedList))), dbTablename=dbTableName,
173 dbSettings=dbSettings, dateModified=dateModified)
175 sys.stdout.write("\x1b[1A\x1b[2K")
176 print("%(ltotalCount)s / %(ltotalCount)s rows inserted into %(dbTableName)s" % locals())
178 log.debug(
179 'completed the ``insert_list_of_dictionaries_into_database_tables`` function')
180 return None
183def _insert_single_batch_into_database(
184 batchIndex,
185 log,
186 dbTableName,
187 uniqueKeyList,
188 dateModified,
189 replace,
190 batchSize,
191 reDatetime,
192 dateCreated):
193 """*summary of function*
195 **Key Arguments**
197 - ``batchIndex`` -- the index of the batch to insert
198 - ``dbConn`` -- mysql database connection
199 - ``log`` -- logger
202 **Return**
204 - None
207 **Usage**
209 .. todo::
211 add usage info
212 create a sublime snippet for usage
214 ```python
215 usage code
216 ```
218 """
219 log.debug('starting the ``_insert_single_batch_into_database`` function')
221 global totalCount
222 global globalDbConn
223 global sharedList
225 batch = sharedList[batchIndex]
227 reDate = reDatetime
229 if isinstance(globalDbConn, dict):
230 # SETUP ALL DATABASE CONNECTIONS
232 dbConn = database(
233 log=log,
234 dbSettings=globalDbConn,
235 autocommit=False
236 ).connect()
237 else:
238 dbConn = globalDbConn
240 count = batch[1]
241 if count > totalCount:
242 count = totalCount
243 ltotalCount = totalCount
245 inserted = False
246 while inserted == False:
248 if not replace:
249 insertVerb = "INSERT IGNORE"
250 else:
251 insertVerb = "INSERT IGNORE"
253 uniKeys = set().union(*(list(d.keys()) for d in batch[0]))
254 tmp = []
255 tmp[:] = [m.replace(" ", "_").replace(
256 "-", "_") for m in uniKeys]
257 uniKeys = tmp
259 myKeys = '`,`'.join(uniKeys)
260 vals = [tuple([None if d[k] in ["None", None] else d[k]
261 for k in uniKeys]) for d in batch[0]]
262 valueString = ("%s, " * len(vals[0]))[:-2]
263 insertCommand = insertVerb + """ INTO `""" + dbTableName + \
264 """` (`""" + myKeys + """`, dateCreated) VALUES (""" + \
265 valueString + """, NOW())"""
267 if not dateCreated:
268 insertCommand = insertCommand.replace(
269 ", dateCreated)", ")").replace(", NOW())", ")")
271 dup = ""
272 if replace:
273 dup = " ON DUPLICATE KEY UPDATE "
274 for k in uniKeys:
275 dup = """%(dup)s %(k)s=values(%(k)s),""" % locals()
276 dup = """%(dup)s updated=1, dateLastModified=NOW()""" % locals()
278 insertCommand = insertCommand + dup
280 insertCommand = insertCommand.replace('\\""', '\\" "')
281 insertCommand = insertCommand.replace('""', "null")
282 insertCommand = insertCommand.replace('"None"', 'null')
284 message = ""
285 # log.debug('adding new data to the %s table; query: %s' %
286 # (dbTableName, addValue))
287 try:
288 message = writequery(
289 log=log,
290 sqlQuery=insertCommand,
291 dbConn=dbConn,
292 Force=True,
293 manyValueList=vals
294 )
295 except:
296 theseInserts = []
297 for aDict in batch[0]:
298 insertCommand, valueTuple = convert_dictionary_to_mysql_table(
299 dbConn=dbConn,
300 log=log,
301 dictionary=aDict,
302 dbTableName=dbTableName,
303 uniqueKeyList=uniqueKeyList,
304 dateModified=dateModified,
305 returnInsertOnly=True,
306 replace=replace,
307 reDatetime=reDate,
308 skipChecks=True
309 )
310 theseInserts.append(valueTuple)
312 message = ""
313 # log.debug('adding new data to the %s table; query: %s' %
314 # (dbTableName, addValue))
315 message = writequery(
316 log=log,
317 sqlQuery=insertCommand,
318 dbConn=dbConn,
319 Force=True,
320 manyValueList=theseInserts
321 )
323 if message == "unknown column":
324 for aDict in batch:
325 convert_dictionary_to_mysql_table(
326 dbConn=dbConn,
327 log=log,
328 dictionary=aDict,
329 dbTableName=dbTableName,
330 uniqueKeyList=uniqueKeyList,
331 dateModified=dateModified,
332 reDatetime=reDate,
333 replace=replace
334 )
335 else:
336 inserted = True
338 dbConn.commit()
340 log.debug('completed the ``_insert_single_batch_into_database`` function')
341 return "None"
344def _add_dictlist_to_database_via_load_in_file(
345 masterListIndex,
346 dbTablename,
347 dbSettings,
348 dateModified=False):
349 """*load a list of dictionaries into a database table with load data infile*
351 **Key Arguments**
353 - ``masterListIndex`` -- the index of the sharedList of dictionary lists to process
354 - ``dbTablename`` -- the name of the database table to add the list to
355 - ``dbSettings`` -- the dictionary of database settings
356 - ``log`` -- logger
357 - ``dateModified`` -- add a dateModified stamp with an updated flag to rows?
360 **Return**
362 - None
365 **Usage**
367 .. todo::
369 add usage info
370 create a sublime snippet for usage
372 ```python
373 usage code
374 ```
376 """
377 from fundamentals.logs import emptyLogger
378 import pandas as pd
379 import numpy as np
380 log = emptyLogger()
381 log.debug('starting the ``_add_dictlist_to_database_via_load_in_file`` function')
383 global sharedList
385 dictList = sharedList[masterListIndex][0]
387 count = sharedList[masterListIndex][1]
388 if count > totalCount:
389 count = totalCount
390 ltotalCount = totalCount
392 # SETUP ALL DATABASE CONNECTIONS
393 dbConn = database(
394 log=log,
395 dbSettings=dbSettings
396 ).connect()
398 now = datetime.now()
399 tmpTable = now.strftime("tmp_%Y%m%dt%H%M%S%f")
401 # CREATE A TEMPORY TABLE TO ADD DATA TO
402 sqlQuery = """CREATE TEMPORARY TABLE %(tmpTable)s SELECT * FROM %(dbTablename)s WHERE 1=0;""" % locals()
403 writequery(
404 log=log,
405 sqlQuery=sqlQuery,
406 dbConn=dbConn
407 )
409 csvColumns = [k for d in dictList for k in list(d.keys())]
410 csvColumns = list(set(csvColumns))
411 csvColumnsString = (', ').join(csvColumns)
412 csvColumnsString = csvColumnsString.replace(u" dec,", u" decl,")
414 df = pd.DataFrame(dictList)
415 df.replace(['nan', 'None', '', 'NaN', np.nan], '\\N', inplace=True)
416 df.to_csv('/tmp/%(tmpTable)s' % locals(), sep="|",
417 index=False, escapechar="\\", quotechar='"', columns=csvColumns, encoding='utf-8')
419 sqlQuery = """LOAD DATA LOCAL INFILE '/tmp/%(tmpTable)s'
420INTO TABLE %(tmpTable)s
421FIELDS TERMINATED BY '|' OPTIONALLY ENCLOSED BY '"'
422IGNORE 1 LINES
423(%(csvColumnsString)s);""" % locals()
425 writequery(
426 log=log,
427 sqlQuery=sqlQuery,
428 dbConn=dbConn
429 )
431 updateStatement = ""
432 for i in csvColumns:
433 updateStatement += "`%(i)s` = VALUES(`%(i)s`), " % locals()
434 if dateModified:
435 updateStatement += "dateLastModified = NOW(), updated = 1"
436 else:
437 updateStatement = updateStatement[0:-2]
439 sqlQuery = """
440INSERT IGNORE INTO %(dbTablename)s
441SELECT * FROM %(tmpTable)s
442ON DUPLICATE KEY UPDATE %(updateStatement)s;""" % locals()
443 writequery(
444 log=log,
445 sqlQuery=sqlQuery,
446 dbConn=dbConn
447 )
449 sqlQuery = """DROP TEMPORARY TABLE %(tmpTable)s;""" % locals()
450 writequery(
451 log=log,
452 sqlQuery=sqlQuery,
453 dbConn=dbConn
454 )
456 try:
457 os.remove('/tmp/%(tmpTable)s' % locals())
458 except:
459 pass
461 dbConn.close()
463 log.debug(
464 'completed the ``_add_dictlist_to_database_via_load_in_file`` function')
465 return None
467# use the tab-trigger below for new function
468# xt-def-function