Parallel Execution with Dask¶
You should understand Kartothek’s Querying Process.
Kartothek executes queries with a task for every parquet file to read. But Kartothek doesn’t read every file. It filters them by the predicate’s restrictions on partitions and indexed columns.
The example dataset in the following pictures is partitioned on column
The bottom half shows the existing parquet files; the upper half shows the
querying process with the (possibly) created tasks. We filter the data for
A=2 AND B="b". Kartothek only processes the files
A=2/label2.parquet because of the restriction on the partitioned column
Kartothek matches every file’s range of values (stated in the parquet footer) against the query.
Kartothek rules out the existence of a row with
as the Parquet statistics state both the minimum and maximum of column
"a". Thus, Kartothek doesn’t read this file and only reads
If a file’s range of values doesn’t match, the task returns an empty Dataframe. Otherwise, the task loads the file and filter its data.
The loaded dataframes then make up the result.
shuffle == False (default) the graph of tasks stays the same as it was
for creating the dataset that we supplied to be stored. The tasks have to split
up their chunk of data according to the partitioning scheme. So every task
writes these partitions into multiple distinct files, one into every folder for
a specific partition.
shuffle == True, the data has to be grouped according to the
num_buckets parameters. Dask handles the
distribution of data. If we’re running on a cluster, it sends the data over the
network between the workers. More information can be found in Dask’s
At the end of either case, the results of the writes are collected and finally atomically committed to Kartothek’s dataset.