Indexing

Kartothek uses different types of inverted file indices to enable efficient partition pruning and improve query performance, see also Efficient Querying for more hints on how to optimize performance. This section describes the different types of indices, how to create them and how to interact with them

Principle in-memory representation

All currently supported kartothek index types are inverted indices and are mapping observed values of a given field to a list of partitions where they were observed.

In [1]: index_dct = {1: ["table/12345"], 2: ["table/12345", "table/6789"]}

Where, in this example, the value 42 is found in exactly one partition which is labeled table/partitionA=42/12345.

Users typically do not interact with indices directly since querying a dataset will automatically load and interact with the indices. For some applications it is still quite useful to interact with them directly.

All indices implement IndexBase which allows the user to interact with the indices in some useful ways.

In [2]: from kartothek.api.dataset import IndexBase

In [3]: index = IndexBase(column="FieldName", index_dct=index_dct)

In [4]: index.dtype
Out[4]: DataType(int64)

In [5]: index.observed_values()
Out[5]: array([1, 2])

In [6]: index.eval_operator(">=", 2)
Out[6]: {'table/12345', 'table/6789'}

In [7]: index.as_flat_series()
Out[7]: 
FieldName
1    table/12345
2    table/12345
2     table/6789
Name: partition, dtype: object

Partition Indices

The first index type kartothek offers is a partition index. The partition index is created by partitioning a dataset in a hive-like partition scheme.

In [8]: df = pd.DataFrame(
   ...:     {
   ...:         "PartField": ["A"] * 5 + ["B"] * 5,
   ...:         "IndexedField": list(range(5)) + list(range(3, 8)),
   ...:         "Payload": [string.ascii_letters[i] for i in range(10)],
   ...:     }
   ...: )
   ...: 

In [9]: dm = store_dataframes_as_dataset(
   ...:     store=store,
   ...:     dataset_uuid="indexing_docs",
   ...:     dfs=[df],
   ...:     partition_on=["PartField"],
   ...:     secondary_indices=["IndexedField"],
   ...: ).load_all_indices(store)
   ...: 

In [10]: sorted(store.keys())
Out[10]: 
['indexing_docs.by-dataset-metadata.json',
 'indexing_docs/indices/IndexedField/2021-07-05T11%3A19%3A35.718684.by-dataset-index.parquet',
 'indexing_docs/table/PartField=A/a6e379ee26624a5cb002f137226b7ed2.parquet',
 'indexing_docs/table/PartField=B/a6e379ee26624a5cb002f137226b7ed2.parquet',
 'indexing_docs/table/_common_metadata']

In [11]: part_index = dm.indices["PartField"]

In [12]: part_index
Out[12]: PartitionIndex(column=PartField, dtype=string, creation_time=2021-07-05 11:19:35.721156, index_dct=['A', 'B'], _index_dct_available=True)

This kind of index is also called a primary index. This implies the property that a given file is guaranteed to only contain one unique value of the given field. This can also be observed when investigating the flat structure of the index.

In [13]: part_index.as_flat_series()
Out[13]: 
PartField
A    PartField=A/a6e379ee26624a5cb002f137226b7ed2
B    PartField=B/a6e379ee26624a5cb002f137226b7ed2
Name: partition, dtype: object

This property makes this kind of index very powerful if used correctly since it prunes the partitions exactly to the user query and enables exact removal of data when mutating datasets (see Mutating Datasets).

For data with high cardinality this kind of index is not well suited since it would result in a highly fragmented dataset with too many, too small files.

Secondary indices

Secondary indices are the most powerful type of indices which allow us to reference files without having to encode any kind of values in the keys. They can be created by supplying the secondary_indices keyword argument as shown above. The user interaction works similarly to the

Persistence

A secondary index is persisted as a Parquet file with the following (Parquet) schema: The field name corresponds to the name of the column in the persisted DataFrame. The partition is a list of partition identifiers, as used in the keys of the partitions map and the data filename. (Note: the partition identifier is used instead of the data filename as a single partition can span multiple files containing different column sets using the same row selection.)

Typing

Every index has a well defined arrow data type which is usually inferred automatically and ensured to be consistent with the overall dataset schema.

In [14]: part_index.dtype
Out[14]: DataType(string)

Supported data types for indices include

  • bool

  • (u)int{8,16,32,64}

  • float{32,64}

  • str

  • bytes

  • pd.Timestamp (with and without timezones)

  • datetime.date

Important

Nullable fields are not properly supported and depending on the used API, the behaviour may be slightly different.

In particular, the plain dataset API will usually drop nan/nulls silently while the Cube API will raise an exception.

See also