This notebooks is heavily based on this official dask tutorial.
We first "reserve" some CPU. We will see in the next chapter exactly how this is done.
from time import sleep
from dask.distributed import Client
client = Client()
There are two simple situations where we can benefit from parallelization: we can have a series of independent functions e.g. in a data processing pipeline, or we can have multiple independent calls to a given function in a for loop. Let's see the first case, and learn how Dask deals with it.
For the purpose of illustration, we imagine that we have two functions that are slow to execute. To simulate the slowness we just pause execution within the functions for a few seconds using time.sleep
, let's say here 1s.
def inc(x):
sleep(1)
return x + 1
def add(x, y):
sleep(1)
return x + y
If we use the timing magic function we can check that the following "script" takes 3s:
%%time
x = inc(1)
y = inc(2)
z = add(x, y)
In principle we could execute the two first lines in parallel as the variables are indpendent. The solution implemented by Dask is the following:
So how do we tell Dask what tasks it should take into account when "taking notes"? The solution is to use the function delayed()
which takes as input another function. Every function "decorated" with the delay function will be included in the Dask flow. Let's try it:
from dask import delayed
%%time
x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)
We see that the time consumed is minimal. But has z been really calculated ?
z
No! z
is also a delayed object now. Dask knows how to calculate it but hasn't done it yet. To effectively get the result of z, we have to use the compute()
function or method:
%%time
z.compute()
The computation time is now only 2s instead of 3. Indeed the two first calls to inc
could be done in parallel, making us gain 1s in the process.
Dask offers a very useful way to visualize how the task are split through the visualize()
method that one can use on any delayed variable. Let's check e.g. what is z
:
z.visualize()
Each "leaf" of that tree starts and indepenent calculation that can be sent to an independent process if available. Here the two inc()
calls start separately and are then combined in the add()
call.
Generating this Task graph and handling the flow of information beetween processes is the main task of Dask.
We can make our calculation slighly more complex and see what happens:
%%time
x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)
x2 = delayed(inc)(1)
y2 = delayed(inc)(2)
z2 = delayed(add)(x2, y2)
total = delayed(add)(z, z2)
total.visualize()
Let's however look at alternative ways of caclulating this last sum. First, what happens if we just use a regualr +
sign ?
total_plus = z + z2
total_plus.visualize()
We obtain exactly the same graph. Dask knows about standard operations and automatically includes them in the task graph. Let's see if we us the standard Python sum()
function:
total_sum = sum([z, z2])
total_sum.visualize()
We now see an important difference! z
and z2
are calculated in sequence and not in parallel (except for their "internal definitions"). What happens here is that sum()
not beeing included in the task graph, triggers the computation first of z
and then of z2
.
This shows that one should be careful when using delay()
for parallelization. Other recommendations are: not delaying a delayed function, breaking long code into multiple delayed functions etc. (see here for more details).
If you haven't executed the notebook unil now, import the packages and start the client.
from dask.distributed import Cient
from dask import delayed
client = Client()
Based on what we just learned apply the inc()
function on the list data
(using a for loop or a comprehension list) and then multiply all elements using the numpy function np.prod()
. Check the task graph to make sure everyting happens as expected:
data = [3,8,1,4,2,9,4,6]
newdata = [delayed(inc)(x) for x in data]
newdata
import numpy as np
prod = np.prod(newdata)
prod
prod.visualize()
# prod is a delayed object
# -> bad, it is doying everything sequential, not parallel
prod = delay(np.prod)(newdata)
prod.visualize()
# -> ok, now everything is run in parallel
# NB: make sense to use paralell if you have huge of data,
# otherwise it can even take longer time ...