As mentioned before, there are other solutions to perform perform parallel computing in Python. However Dask offers an crucial feature not present in other libraries: a built-in parallelized implemtation of large parts of the popular libraries Numpy and Pandas. In other terms, no need to systematically use delayed
or think how to optimize a function, Dask has already done it for you!
Here we will first explore possibilities offered by dask-arrays, the equivalent of numpy arrays. As usual, we first create our cluster:
from dask.distributed import Client
client = Client("tcp://127.0.0.1:63517")
client
The equivalent of the numpy import is the dask.array import:
import dask.array as da
import numpy as np
A great feature of dask.array
is that it mirror very closely the Numpy API, so if you are familiar with the latter, you should have no problem with dask.
For example let's create an array of random numbers and check that they behave the same way:
nprand = np.random.randint(0,100, (4,5))
darand = da.random.randint(0,100, (4,5))
nprand.shape
darand.shape
Let's look that the arrays directly:
nprand
darand
Here we see already a difference. Numpy just shows the matrix, while dask shows us a much richer output, including size, type, dimensionality etc.
But do the darand
values exist anywhere ? Let's check that we can find the maximum in the array:
darand.max()
Again, we get some info but no values. In fact, as with delayed
before, the values have not been computed yet!
The logic is the same as with delayed. Any time we actually want a result we can call the compute
method:
darand.max().compute()
There could also be intermediate steps:
myval = 10*darand.max()
myval.compute()
Dask re-implements many standard array creation functions, including zeros()
, ones()
and many of the np.random
module.
However one can also create arrays directly from a numpy array:
da_array = da.from_array(np.ones((5,8)))
da_array
Let's create a larger array and see how it is handled by Dask and compare it with Numpy:
large_nparray = np.random.randint(0,100,(10000,1000,100))
myarray = da.random.randint(0,100,(10000,1000,100))
myarray
First, notice how the array visualisation is helpful! Second, note that we have information about "chunks". When handling larger objects, Dasks automatically breaks them into chunks that can be generated or operated on by different workers in a parallel way. We can compute the mean of this array and observe what happens:
mean = myarray.mean()
mean.visualize()
mean.compute()
One of the main feature of numpy array is the possibility to slice and index them. Great news: dask arrays behave exactly in the same way for most "regular" cases (e.g. it doesn't implement slicing with multiple lists). Let's see how it works:
myarray = da.random.random((5000,5000))
myarray
For example we can slice the array:
sliced_array = myarray[::2,:]
sliced_array
Or we can use logical indexing. First we create a logical array:
logical_array = myarray > 0.5
logical_array
And then use it for logical indexing:
extracted_values = myarray[logical_array]
extracted_values
Of course here for example we don't know the size of the resulting length. This is a typical case where any downstream parallelization becomes difficult as chunks of the array cannot be distributed. However we can get the result:
values = extracted_values.compute()
values
An extremely useful features of Dask is that whenever you are handling a dask-array you can apply most of the Numpy funtions to it and it remains a dask-array, i.e. it gets integrated in the task graph. For example:
cos_array = np.cos(myarray)
cos_array
cos_array.visualize()
Dask also re-implements many numpy functions internally so that they are accessible as methods of the dask-arrays:
proj = myarray.sum(axis = 0)
proj
The great advantage of dask-arrays is that functions have been optimized in order to make the task-graph very efficient. For example this simple calculation produces already a quite complex task graph. If handling large "out-of-RAM" array with numpy, one would have to break up the large array and be very smart about how to process each task.
newda = myarray + da.transpose(myarray)
newda.visualize()
This is already quite complicated, but it can become much more complicated very quickly.
newda = da.dot(myarray, myarray + da.transpose(myarray))
newda.visualize()
%%time
computed_array = newda.compute();
myarray2 = np.random.random((5000,5000))
%%time
newnp = np.dot(myarray2, myarray2 + np.transpose(myarray2))
We see here that for a reasonably sized array, the overhead time needed to push data between processes makes Dask slower than basic Numpy, so be careful in what context you use Dask! But Dasks scales nicely:
myarray = da.random.random((10000,10000))
newda = da.dot(myarray, myarray + da.transpose(myarray))
newda.visualize()
Of course there are limitations to what one can do. For example, most linear algebra functions are not dask compatible:
myarray = da.random.random((10,10))
eigenval, eigenvect = np.linalg.eig(myarray);
The result is not a dask array:
eigenval
Also some operations such as those reshaping arrays may pose difficulties to Dasks as they require reshuffling array chunks. For example:
myarray = da.zeros((5000,5000))
myarray
This works because it's easy to reshuffle some chunks:
reshaped = np.reshape(myarray,(5000,1000,5))
reshaped
reshaped.visualize()
But this doesn't:
reshaped = np.reshape(myarray,(1000,5000,5))
While it actually works in numpy:
numpy_array = np.zeros((5000,5000))
reshaped = np.reshape(numpy_array,(1000,5000,5))
reshaped.shape
Try to solve this exercise. Regularly check the visual representation of arrays and of the task-graph to understand what is going on.
import dask.array as da
import numpy as np
da_array = da.random.normal(loc=9, scale=1, size=(5000, 5000))
np_array = np.ones((5000,5000))
added = da_array + np_array
added
# output is still a dask array
masked = added[added < 10]
# in numpy, no need to specify bins and range (automatically chosen if not specified)
# here it requires to specify bins and range
# (dask has to know what you want to now how to distribute things)
myhist, bins = da.histogram(masked, bins=100, range=[-9,11])
myhist.visualize()
myhist.compute()