Coverage for fundamentals/fmultiprocess.py : 0%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/local/bin/python
2# encoding: utf-8
3"""
4*A function to quickly add multiprocessing to any program*
6:Author:
7 David Young
8"""
9from __future__ import division
10from past.utils import old_div
11import sys
12import os
13os.environ['TERM'] = 'vt100'
14from fundamentals import tools
15from functools import partial
16import inspect
19def fmultiprocess(
20 log,
21 function,
22 inputArray,
23 poolSize=False,
24 timeout=3600,
25 **kwargs):
26 """multiprocess pool
28 **Key Arguments**
30 - ``log`` -- logger
31 - ``function`` -- the function to multiprocess
32 - ``inputArray`` -- the array to be iterated over
33 - ``poolSize`` -- limit the number of CPU that are used in multiprocess job
34 - ``timeout`` -- time in sec after which to raise a timeout error if the processes have not completed
37 **Return**
39 - ``resultArray`` -- the array of results
42 **Usage**
44 ```python
45 from fundamentals import multiprocess
46 # DEFINE AN INPUT ARRAY
47 inputArray = range(10000)
48 results = multiprocess(log=log, function=functionName, poolSize=10, timeout=300,
49 inputArray=inputArray, otherFunctionKeyword="cheese")
50 ```
52 """
53 log.debug('starting the ``multiprocess`` function')
54 import psutil
55 # import multiprocess as mp
56 # mp.set_start_method('forkserver')
57 from multiprocess import cpu_count, Pool
59 # DEFINTE POOL SIZE - NUMBER OF CPU CORES TO USE (BEST = ALL - 1)
60 if not poolSize:
61 poolSize = psutil.cpu_count()
63 if poolSize:
64 p = Pool(processes=poolSize)
65 else:
66 p = Pool()
68 cpuCount = psutil.cpu_count()
69 chunksize = int(old_div((len(inputArray) + 1), (cpuCount * 3)))
71 if chunksize == 0:
72 chunksize = 1
74 # chunksize = 1
76 # MAP-REDUCE THE WORK OVER MULTIPLE CPU CORES
77 logFound = False
78 # PYTHON 3 VS 2 ..
79 try:
80 if "log" in inspect.getfullargspec(function)[0]:
81 logFound = True
82 except:
83 if "log" in inspect.getargspec(function)[0]:
84 logFound = True
85 if logFound:
86 mapfunc = partial(function, log=log, **kwargs)
87 resultArray = p.map_async(mapfunc, inputArray, chunksize=chunksize)
88 else:
89 mapfunc = partial(function, **kwargs)
90 resultArray = p.map_async(mapfunc, inputArray, chunksize=chunksize)
92 resultArray = resultArray.get(timeout=timeout)
94 p.close()
95 p.join()
96 # p.terminate()
98 log.debug('completed the ``multiprocess`` function')
99 return resultArray