Partitioning

As we have already seen, writing data in Kartothek amounts to writing partitions, which in the underlying key-value store translates to writing files to the storage layer in a structured manner.

From the perspective of efficient access, it would be helpful if accessing a subset of written data didn’t require reading through an entire dataset to be able to identify and access the required subset. This is where explicitly partitioning by table columns helps.

Kartothek is designed primarily for storing large datasets consistently. One way to do this is to structure the data well, this can be done by explicitly partitioning the dataset by select columns.

One benefit of doing so is that it allows for selective operations on data, which makes reading as well as mutating (replacing or deleting) subsets of data much more efficient as only a select amount of files need to be read.

To see explicit partitioning in action, let’s set up some data and a storage location first and store the data there with Kartothek:

In [1]: import numpy as np

In [2]: import pandas as pd

In [3]: from functools import partial

In [4]: from tempfile import TemporaryDirectory

In [5]: from storefact import get_store_from_url

In [6]: from kartothek.io.eager import store_dataframes_as_dataset

In [7]: dataset_dir = TemporaryDirectory()

In [8]: store_url = f"hfs://{dataset_dir.name}"

In [9]: df = pd.DataFrame(
   ...:     {
   ...:         "A": 1.0,
   ...:         "B": [
   ...:             pd.Timestamp("20130102"),
   ...:             pd.Timestamp("20130102"),
   ...:             pd.Timestamp("20130103"),
   ...:             pd.Timestamp("20130103"),
   ...:         ],
   ...:         "C": pd.Series(1, index=list(range(4)), dtype="float32"),
   ...:         "D": np.array([3] * 4, dtype="int32"),
   ...:         "E": pd.Categorical(["test", "train", "test", "train"]),
   ...:         "F": "foo",
   ...:     }
   ...: )
   ...: 

In [10]: df
Out[10]: 
     A          B    C  D      E    F
0  1.0 2013-01-02  1.0  3   test  foo
1  1.0 2013-01-02  1.0  3  train  foo
2  1.0 2013-01-03  1.0  3   test  foo
3  1.0 2013-01-03  1.0  3  train  foo

Kartothek allows users to explicitly partition their data by the values of table columns such that, for a given input partition, all the rows with the same value of the column all get written to the same partition. To do this, we use the partition_on keyword argument:

In [11]: dm = store_dataframes_as_dataset(
   ....:     store_url, "partitioned_dataset", [df], partition_on="B"
   ....: )
   ....: 

Partitioning based on a date column ussually makes sense for timeseries data.

Of interest here is dm.partitions:

In [12]: sorted(dm.partitions.keys())
Out[12]: 
['B=2013-01-02%2000%3A00%3A00/a6841abb242e4ca9b96808f5b83b3925',
 'B=2013-01-03%2000%3A00%3A00/a6841abb242e4ca9b96808f5b83b3925']

We can see that partitions have been stored in a way which indicates the specific value for the column on which partitioning has been performed.

Partitioning can also be performed on multiple columns; in this case, columns should be specified as a list:

In [13]: duplicate_df = df.copy()

In [14]: duplicate_df.F = "bar"

In [15]: dm = store_dataframes_as_dataset(
   ....:     store_url,
   ....:     "another_partitioned_dataset",
   ....:     [df, duplicate_df],
   ....:     partition_on=["E", "F"],
   ....: )
   ....: 

In [16]: sorted(dm.partitions.keys())
Out[16]: 
['E=test/F=bar/47519ab8b9954c9ca0d1d460cdbed611',
 'E=test/F=foo/fe8795116d654d2bba0ba9c819010977',
 'E=train/F=bar/47519ab8b9954c9ca0d1d460cdbed611',
 'E=train/F=foo/fe8795116d654d2bba0ba9c819010977']

Note that, since 2 dataframes have been provided as input to the function, there are 4 different files created, even though only 2 different combinations of values of E and F are found, E=test/F=foo and E=train/F=foo (However, these 4 physical partitions can be read as just the 2 logical partitions by using the argument concat_partitions_on_primary_index=True at reading time).

For datasets consisting of multiple tables, explicit partitioning on columns can only be performed if the column exists in both tables and is of the same data type: guaranteeing that their types are the same is part of schema validation in Kartothek.

For example:

In [17]: df.dtypes
Out[17]: 
A           float64
B    datetime64[ns]
C           float32
D             int32
E          category
F            object
dtype: object

In [18]: different_df = pd.DataFrame(
   ....:     {"B": pd.to_datetime(["20130102", "20190101"]), "L": [1, 4], "Q": [True, False]}
   ....: )
   ....: 

In [19]: different_df.dtypes
Out[19]: 
B    datetime64[ns]
L             int64
Q              bool
dtype: object

In [20]: dm = store_dataframes_as_dataset(
   ....:     store_url,
   ....:     "multiple_partitioned_tables",
   ....:     [{"data": {"table1": df, "table2": different_df}}],
   ....:     partition_on="B",
   ....: )
   ....: 

In [21]: sorted(dm.partitions.keys())
Out[21]: 
['B=2013-01-02%2000%3A00%3A00/b6f90b520bea4d7c8b6405c7414e91b0',
 'B=2013-01-03%2000%3A00%3A00/b6f90b520bea4d7c8b6405c7414e91b0',
 'B=2019-01-01%2000%3A00%3A00/b6f90b520bea4d7c8b6405c7414e91b0']

As noted above, when data is appended to a dataset, Kartothek guarantees it has the proper schema and partitioning.

The order of columns provided in partition_on is important, as the partition structure would be different if the columns are in a different order.

Note

Every partition must have data for every table. An empty dataframe in this context is also considered as data.

Force partitioning by shuffling using Dask

By default, the partitioning logic is applied per physical input partition when writing. In particular, this means that when calling partition_on on a column with total N unique values, this may create up to M x N files, where M is the number of physical input partitions.

In [22]: import dask.dataframe as dd

In [23]: import numpy as np

In [24]: from kartothek.io.dask.dataframe import update_dataset_from_ddf

In [25]: df = pd.DataFrame(
   ....:     {
   ....:         "A": [0, 1] * 100,
   ....:         "B": np.repeat(range(20), 10),
   ....:         "C": "some_payload",
   ....:     }
   ....: )
   ....: 

In [26]: ddf = dd.from_pandas(df, npartitions=10)

In [27]: dm = update_dataset_from_ddf(
   ....:     ddf, dataset_uuid="no_shuffle", store=store_url, partition_on="A", table="table"
   ....: ).compute()
   ....: 

In [28]: sorted(dm.partitions.keys())
Out[28]: 
['A=0/0418314beaf341d8a99617fc3dc2f676',
 'A=0/2b86f655a257432ab3d7a6dcee88713a',
 'A=0/4630d3590de64d0e9ac567a96257dcc1',
 'A=0/7b249d7a1bb444c9aef1b2a12e4c48f4',
 'A=0/8ee9748b1f2e4f9e816a18c58b670653',
 'A=0/99626ef3366f46d79ccedb10193a75c2',
 'A=0/ae1d736788ad43a0b2fb5f46f5573c42',
 'A=0/b6dbb9da22cc4e13aeb934148ad3e62e',
 'A=0/daca1ea93b5e430aaa4512becc3632a8',
 'A=0/fef558dc9c4e408c897ca668e373fc1f',
 'A=1/0418314beaf341d8a99617fc3dc2f676',
 'A=1/2b86f655a257432ab3d7a6dcee88713a',
 'A=1/4630d3590de64d0e9ac567a96257dcc1',
 'A=1/7b249d7a1bb444c9aef1b2a12e4c48f4',
 'A=1/8ee9748b1f2e4f9e816a18c58b670653',
 'A=1/99626ef3366f46d79ccedb10193a75c2',
 'A=1/ae1d736788ad43a0b2fb5f46f5573c42',
 'A=1/b6dbb9da22cc4e13aeb934148ad3e62e',
 'A=1/daca1ea93b5e430aaa4512becc3632a8',
 'A=1/fef558dc9c4e408c897ca668e373fc1f']

Shuffling

To circumvent the heavy file fragmentation, we offer a shuffle implementation for dask dataframes which causes the fragmented files for the respective partitioning values of A to be fused into a single file.

In [29]: dm = update_dataset_from_ddf(
   ....:     ddf,
   ....:     dataset_uuid="with_shuffle",
   ....:     store=store_url,
   ....:     partition_on="A",
   ....:     shuffle=True,
   ....:     table="table",
   ....: ).compute()
   ....: 

In [30]: sorted(dm.partitions.keys())
Out[30]: 
['A=0/71c6ae0ad29841ae9183d32ce640262e',
 'A=1/cca87f6bed474d5e9184e4f95f6526d1']

Warning

This may require a lot of memory since we need to shuffle the data. Most of this increased memory usage can be compensated by using dask spill-to-disk. If peak memory usage is an issue and needs to be controlled, it may be helpful to reduce the final file sizes because the serialization part into the Apache Parquet file format usually requires a bit more memory than the shuffling tasks themselves, see also Bucketing.

Bucketing

If you need more control over the size of files and the distribution within the files you can also ask for explicit bucketing of values.

Note

There are many reasons for wanting smaller files. One reason could be a reduced peak memory usage during dataset creation, another might be due to memory or performance requirements in later steps. If you intend to optimize your pipelines by reducing file sizes we also recommend to look into predicate pushdown, see also Efficient Querying which might offer similar, synergetic effects.

Bucketing uses the values of the requested columns and assigns every unique tuple to one of num_buckets files. This not only helps to control output file sizes but also allows for very efficient querying in combination with seconday indices, see also Efficient Querying.

In the below example you can see the same data being used as above but this time we will bucket by column B which will no longer create a single file per value in B but rather num_buckets files. When investigating the index, we can also see that a query for a given value in B will return exactly one file per partition key.

In [31]: dm = update_dataset_from_ddf(
   ....:     ddf,
   ....:     dataset_uuid="with_bucketing",
   ....:     store=store_url,
   ....:     partition_on="A",
   ....:     shuffle=True,
   ....:     table="table",
   ....:     bucket_by="B",
   ....:     num_buckets=4,
   ....:     secondary_indices="B",
   ....: ).compute()
   ....: 

In [32]: sorted(dm.partitions.keys())
Out[32]: 
['A=0/0d8b0594950a40d5b39de4e3a801a2a7',
 'A=0/23fb2d6bf53f41eeb30c4033c7534edc',
 'A=0/3d34267fbf0a4fb4a99a636f64151b51',
 'A=0/a0084533c5464721b9eb1dc89630d1f5',
 'A=1/31e3bdb94935450994b47c03b4779b5a',
 'A=1/6b57b0989bd1494a92c47049bb5b7e02',
 'A=1/8f21fc33fe5c4f4d836a697c552db0db',
 'A=1/97f0958ca6a546aaa85c73576d37af70']

In [33]: dm = dm.load_index("B", store_url)

In [34]: sorted(dm.indices["B"].eval_operator("==", 1))
Out[34]: 
['A=0/0d8b0594950a40d5b39de4e3a801a2a7',
 'A=1/8f21fc33fe5c4f4d836a697c552db0db']