(Re-)Store a dask indexΒΆ

Calculating a dask index is usually a very expensive operation which requires data to be shuffled around. To (re-)store the dask index we can use the dask_index_on keyword.

In [1]: import dask.dataframe as dd

In [2]: from kartothek.io.dask.dataframe import update_dataset_from_ddf, read_dataset_as_ddf

In [3]: df
Out[3]: 
     A          B    C  D      E    F
0  1.0 2013-01-02  1.0  3   test  foo
1  1.0 2013-01-02  1.0  3  train  foo
2  1.0 2013-01-03  1.0  3   test  foo
3  1.0 2013-01-03  1.0  3  train  foo

In [4]: ddf = dd.from_pandas(df, npartitions=2)

In [5]: ddf_indexed = ddf.set_index("B")

In [6]: dm = update_dataset_from_ddf(
   ...:     ddf_indexed.reset_index(),
   ...:     table="table",
   ...:     dataset_uuid="dataset_ddf_with_index",
   ...:     store=store_factory,
   ...:     partition_on="B",
   ...: ).compute()
   ...: 

In [7]: read_dataset_as_ddf(
   ...:     dataset_uuid=dm.uuid, store=store_factory, dask_index_on="B", table="table"
   ...: )
   ...: 
Out[7]: 
Dask DataFrame Structure:
                     A        C      D       E       F
npartitions=2                                         
2013-01-02     float64  float64  int64  object  object
2013-01-03         ...      ...    ...     ...     ...
2013-01-03         ...      ...    ...     ...     ...
Dask Name: set_index, 8 tasks