Getting Started

Kartothek manages datasets that consist of files that contain tables. It does so by offering a metadata definition to handle these datasets efficiently.

Datasets in Kartothek are made up of one or more tables, each with a unique schema. When working with Kartothek tables as a Python user, we will use DataFrame as the user-facing type.

We typically expect that the contents of a dataset are large, often too large to be held in memory by a single machine but for demonstration purposes, we will use a small DataFrame with a mixed set of types.

In [1]: import numpy as np

In [2]: import pandas as pd

In [3]: df = pd.DataFrame(
   ...:     {
   ...:         "A": 1.0,
   ...:         "B": pd.Timestamp("20130102"),
   ...:         "C": pd.Series(1, index=list(range(4)), dtype="float32"),
   ...:         "D": np.array([3] * 4, dtype="int32"),
   ...:         "E": pd.Categorical(["test", "train", "test", "prod"]),
   ...:         "F": "foo",
   ...:     }
   ...: )
   ...: 

In [4]: another_df = pd.DataFrame(
   ...:     {
   ...:         "A": 5.0,
   ...:         "B": pd.Timestamp("20110102"),
   ...:         "C": pd.Series(1, index=list(range(4)), dtype="float32"),
   ...:         "D": np.array([12] * 4, dtype="int32"),
   ...:         "E": pd.Categorical(["prod", "train", "test", "train"]),
   ...:         "F": "bar",
   ...:     }
   ...: )
   ...: 

Defining the storage location

We want to store this DataFrame now as a dataset. Therefore, we first need to connect to a storage location.

We define a store factory as a callable which contains the storage information. We will use storefact in this example to construct such a store factory for the local filesystem (hfs:// indicates we are using the local filesystem and what follows is the filepath).

In [5]: from functools import partial

In [6]: from tempfile import TemporaryDirectory

In [7]: from storefact import get_store_from_url

In [8]: dataset_dir = TemporaryDirectory()

In [9]: store_factory = partial(get_store_from_url, f"hfs://{dataset_dir.name}")

Storage locations

storefact offers support for several stores in Kartothek, these can be created using the function storefact.get_store_from_url() with one of the following prefixes:

  • hfs: Local filesystem
  • hazure: AzureBlockBlobStorage
  • hs3: BotoStore (Amazon S3)

Store factories

The reason store_factory is defined as a partial callable with the store information as arguments is because, when using distributed computing backends in Kartothek, the connections of the store cannot be safely transferred between processes and thus we pass storage information to workers as a factory function.

Interface

Kartothek can write to any location that fulfills the simplekv.KeyValueStore interface as long as they support ExtendedKeyspaceMixin (this is necessary so that / can be used in the storage key name).

For more information, take a look out at the storefact documentation.

Writing data to storage

Now that we have some data and a location to store it in, we can persist it as a dataset. To do so, we will use store_dataframes_as_dataset() to store the DataFrame df that we already have in memory.

In [10]: from kartothek.io.eager import store_dataframes_as_dataset

In [11]: df.dtypes.equals(another_df.dtypes)  # both have the same schema
Out[11]: True

In [12]: dm = store_dataframes_as_dataset(
   ....:     store_factory, "a_unique_dataset_identifier", [df, another_df]
   ....: )
   ....: 

Scheduling backends

The import path of this function already gives us a hint about the general structuring of the Kartothek modules. In kartothek.io we have all the building blocks to build data pipelines that read and write from/to storages. The next module level (e.g. eager) describes the scheduling backend.

The scheduling backends currently supported by Kartothek are:

  • eager runs all execution immediately and on the local machine.
  • iter executes operations on the dataset using a generator/iterator interface. The standard format to read/store dataframes in iter is by providing a generator of dataframes.
  • dask is suitable for larger datasets. It can be used to work on datasets in parallel or even in a cluster by using dask.distributed as the backend. There are also dask.bag and dask.dataframe which support I/O operations for the respective dask collections.

After calling store_dataframes_as_dataset(), a DatasetMetadata object is returned. This class holds information about the structure and schema of the dataset.

In [13]: dm.tables
Out[13]: ['table']

In [14]: sorted(dm.partitions.keys())
Out[14]: ['4c8cf7fffe0546efaaba9928ea59e90d', 'cd89989890214bb5a0c35aecf72e175d']

In [15]: dm.table_meta["table"].remove_metadata()  # Arrow schema
Out[15]: 
A: double
B: timestamp[ns]
C: double
D: int64
E: string
F: string

For this guide, two attributes that are noteworthy are tables and partitions:

  • Each dataset has one or more tables, where each table is a logical collection of data, bound together by a common schema.
  • partitions are the physical “pieces” of data which together constitute the contents of a dataset. Data is written to storage on a per-partition basis. See the section on partitioning for further details: Partitioning.

The attribute table_meta can be accessed to see the underlying schema of the dataset. See Table type system for more information.

To store multiple dataframes into a dataset, it is possible to pass a collection of dataframes; the exact format will depend on the I/O backend used.

Additionally, Kartothek supports several data input formats, it does not need to always be a plain pd.DataFrame. See parse_input_to_metapartition() for further details.

If table names are not specified when passing an iterator of dataframes, Kartothek assumes these dataframes are different chunks of the same table and expects their schemas to be identical. A ValueError will be thrown otherwise. For example,

In [16]: df2 = pd.DataFrame(
   ....:     {
   ....:         "G": "foo",
   ....:         "H": pd.Categorical(["test", "train", "test", "train"]),
   ....:         "I": np.array([9] * 4, dtype="int32"),
   ....:         "J": pd.Series(3, index=list(range(4)), dtype="float32"),
   ....:         "K": pd.Timestamp("20190604"),
   ....:         "L": 2.0,
   ....:     }
   ....: )
   ....: 

In [17]: df.dtypes.equals(df2.dtypes)  # schemas are different!
Out[17]: False
In [18]: store_dataframes_as_dataset(
   ....:     store_factory,
   ....:     "will_not_work",
   ....:     [df, df2],
   ....: )
   ....: 
---------------------------------------------------------------------------
ValueError: Schema violation
Origin schema: {table/9e7d9217c82b4fda9c4e720dc987c60d}
Origin reference: {table/80feb4d84ac34a9c9d08ba48c8170647}

Note

Read these sections for more details: Table type system, Specification, input_output.

When we do not explicitly define the name of the table and partition, Kartothek uses the default table name table and generates a UUID for the partition name.

A more complex example: multiple named tables

Sometimes it may be useful to write multiple dataframes with different schemas into a single dataset. This can be achieved by creating a dataset with multiple tables.

In this example, we create a dataset with two tables: core-table and aux-table. The schemas of the tables are identical across partitions (each dictionary in the dfs list argument represents a partition).

In [19]: dfs = [
   ....:     {
   ....:         "data": {
   ....:             "core-table": pd.DataFrame({"id": [22, 23], "f": [1.1, 2.4]}),
   ....:             "aux-table": pd.DataFrame({"id": [22], "col1": ["x"]}),
   ....:         }
   ....:     },
   ....:     {
   ....:         "data": {
   ....:             "core-table": pd.DataFrame({"id": [29, 31], "f": [3.2, 0.6]}),
   ....:             "aux-table": pd.DataFrame({"id": [31], "col1": ["y"]}),
   ....:         }
   ....:     },
   ....: ]
   ....: 

In [20]: dm = store_dataframes_as_dataset(store_factory, dataset_uuid="two-tables", dfs=dfs)

In [21]: dm.tables
Out[21]: ['aux-table', 'core-table']

Reading data from storage

After we have written the data, we may want to read it back in again. For this we can use read_table(). This method returns the complete table of the dataset as a pandas DataFrame.

In [22]: from kartothek.io.eager import read_table

In [23]: read_table("a_unique_dataset_identifier", store_factory, table="table")
Out[23]: 
     A          B    C   D      E    F
0  1.0 2013-01-02  1.0   3   test  foo
1  1.0 2013-01-02  1.0   3  train  foo
2  1.0 2013-01-02  1.0   3   test  foo
3  1.0 2013-01-02  1.0   3   prod  foo
4  5.0 2011-01-02  1.0  12   prod  bar
5  5.0 2011-01-02  1.0  12  train  bar
6  5.0 2011-01-02  1.0  12   test  bar
7  5.0 2011-01-02  1.0  12  train  bar

We can also read a dataframe iteratively, using read_dataset_as_dataframes__iterator(). This will return a generator of dictionaries (one dictionary for each partition), where the keys of each dictionary represent the tables of the dataset. For example,

In [24]: from kartothek.io.iter import read_dataset_as_dataframes__iterator

In [25]: for partition_index, df_dict in enumerate(
   ....:     read_dataset_as_dataframes__iterator(dataset_uuid="two-tables", store=store_factory)
   ....: ):
   ....:     print(f"Partition #{partition_index}")
   ....:     for table_name, table_df in df_dict.items():
   ....:         print(f"Table: {table_name}. Data: \n{table_df}")
   ....: 
Partition #0
Table: core-table. Data: 
   id    f
0  22  1.1
1  23  2.4
Table: aux-table. Data: 
   id col1
0  22    x
Partition #1
Table: core-table. Data: 
   id    f
0  29  3.2
1  31  0.6
Table: aux-table. Data: 
   id col1
0  31    y

Respectively, the dask.delayed back-end provides the function read_dataset_as_delayed(), which has a very similar interface to the read_dataset_as_dataframes__iterator() function but returns a collection of dask.delayed objects.

Filtering using predicates

It is possible to filter data during reads using simple predicates by using the predicates argument. Technically speaking, Kartothek supports predicates in disjunctive normal form.

When this argument is defined, Kartothek uses the Apache Parquet metadata as well as indices and partition information to speed up queries when possible. How this works is a complex topic, see Efficient Querying.

# Read only values table `core-table` where `f` < 2.5
In [26]: read_table(
   ....:     "two-tables", store_factory, table="core-table", predicates=[[("f", "<", 2.5)]]
   ....: )
   ....: 
Out[26]: 
     f  id
0  1.1  22
1  2.4  23
2  0.6  31

For a deeper dive into Kartothek you can take a look at further_useful_features. Also look at Efficient Querying.