from dask.distributed import Client
client = Client("tcp://127.0.0.1:56643")
client
Just like numpy arrays, Dask implements an equivalent of the Pandas dataframe. Let's briefly remember what a dataframe is by loading some tabular data:
import pandas as pd
births = pd.read_csv('../Data/Birthdays.csv')
births
A dataframe is a table where each line represents an observation and that can contain numerical values or categorical values (it can contain lists or other complex objects, but rather shouldn't). Each line has an index that can be used to recover that line:
births.loc[0]
One can recover each variable either for a specific index:
births.loc[0].state
or for the entire dataframe:
births.state
Dataframes also support numpy-like indexing:
births.state == 'AK'
births[births.state == 'AK']
births.mean()
Pandas is a huge library that offers all necessary tools for advanced data science and is used in many other packages such as the plotting library seaborn or the machine learning packages scikit-learn (you can learn a bit more about Pandas e.g. here.
We import now the same csv file, but now as a dask-dataframe and not a pandas-dataframe:
from dask import dataframe as dd
births_da = dd.read_csv('../Data/Birthdays.csv')
len(births_da.compute())
births_da
Again, we see that there are no actual data there. All Dask did was to read the first lines to figure out the columns and types. If we want to have a clearer idea of the file content we can use head()
:
births_da.head()
Now we can now do the same fancy indexing that we did before:
subtable = births_da[births_da.state == 'AK']
and see that there are still no data there. Let's look at the task graph:
subtable.visualize()
As the file is small, no tasks are parallelized. But we can do this artificially by forcing dask to break the file into smaller chunks:
births_da = dd.read_csv('../Data/Birthdays.csv',
blocksize=5e6)
len(births_da.compute())
births_da
subtable = births_da[births_da.state == 'AK']
subtable.visualize()
One of the main uses of dataframes is the production of statistics, in particular for specific sub-parts of the dataframe through the groupby()
function. These operations are supported by Dask as well:
births_grouped = births_da.groupby('state')
births_group_mean = births_grouped.mean()
births_group_mean.visualize()
births_group_mean.compute()
births_group_mean.births.nlargest(10).compute()
births_da.state.unique().compute()
The birth dataset is not very large and dask doesn't really help because it fits in RAM and the overhead of communication betweeen processes is too important.
Let's look at a case where files are larger and/or our dataset is split between multiple files. This dataset is taken from Zenodo and represents an analysis of all edits made to Wikipedia pages from its beginning to 2016.
Data are split among multiple zip files, each containing multiple "largish" (500Mb) CSV files. Let's look at one of them:
filepath = '../Data/wikipedia/20161101-current_content-part1-12-1728.csv'
wikipedia_changes = dd.read_csv(filepath)
wikipedia_changes
We see that here Dask decided by default to split the file into 9 partitions becasuse of its size. Let's look at a few lines:
wikipedia_changes.head()
The page_id
corresponds to a specific Wikipedia topic, the str
represents a given word that has been added or modified. The in
and out
arrays represent a sequence of events (referenced by an ID) of adding and removal, i.e. the longer the list, the most often this word has been edited.
Word of caution: Dask imports each partition as a separate dataframe, meaning that if the index is a default numeric index, it restarts for each dataframe. In other words, when querying index = 0, we will here get 9 items:
wikipedia_changes.loc[0].compute()
Hence there is no simple way to "get the first 10 elements of the dataframe". Instead, it's much simpler for example to ask "give me the first 10 elements of page_id = 593":
first_words = wikipedia_changes[wikipedia_changes.page_id==593].loc[0:20].compute()
Let's see what strings we have here:
' '.join(list(first_words.str.values))
Seems to be this page.
Let's see how Pandas and Dask compare on this single "largish" (500Mb) file. We can for example count occurrences of single words. We can use the same functions as in Pandas (value_counts
) as dasks implements a very close API:
count_str = wikipedia_changes.str.value_counts()
count_str.visualize()
real_count = count_str.compute()
Let's look at the the few most used words or "tokens":
real_count.head(n = 30)
Now we compare the performances of Pandas and Dask:
%%time
wikipedia_changes = dd.read_csv(filepath)
count_str = wikipedia_changes.str.value_counts()
real_count = count_str.compute()
%%time
wiki_pd = pd.read_csv(filepath)
count_str = wiki_pd.str.value_counts()
We see that Dask doesn't help much in this case.
We only looked at a tiny part of the dataset. We will now look at much more of it even though still not at the complete one.
Dask offers the very useful feature of being able to open multiple files as one dask-dataframe by using the wild-card * or generating a file list. For example here, we have multiple CSV files in the folder and we can just say:
wiki_large = dd.read_csv('../Data/wikipedia/2016*.csv')
We see many more partitions, meaning that dask indeed considered all files. If we wanted to import the files with pandas we would have more trouble:
import glob
all_files = glob.glob('../Data/wikipedia/2016*.csv')
#wiki_large_pd = pd.concat((pd.read_csv(f) for f in all_files))
all_files
Let's time again the same taks as before:
%%time
wiki_large = dd.read_csv('../Data/wikipedia/2016*.csv')
count_str = wiki_large.str.value_counts()
real_count = count_str.compute()
%%time
all_files = glob.glob('../Data/wikipedia/2016*.csv')
wiki_large_pd = pd.concat([pd.read_csv(f) for f in all_files])
count_str = wiki_large_pd.str.value_counts()
../Data/Chicago_taxi
folderimport dask.dataframe as dd
taxi = dd.read_csv('../Chicago_taxi/chicago.csv')
taxi
taxi = dd.read_csv('../Chicago_taxi/chicago.csv', dtype={'taxi_id': 'float64'})
# if not specified, get an error at mean().compute()
taxi_group = taxi.group_by('payment_type')
mean_val = taxi_group.mean().compute()
mean_val.tips