kartothek.io.eager_cube module

Eager IO aka “everything is done locally and immediately”.

kartothek.io.eager_cube.append_to_cube(data: Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame], List[Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame]]]], cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None)Dict[str, kartothek.core.dataset.DatasetMetadata][source]

Append data to existing cube.

For details on data and metadata, see build_cube().

Important

Physical partitions must be updated as a whole. If only single rows within a physical partition are updated, the old data is treated as “removed”.

Hint

To have better control over the overwrite “mask” (i.e. which partitions are overwritten), you should use remove_partitions() beforehand.

Parameters
  • data – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.

  • cube – Cube specification.

  • store – Store to which the data should be written to.

  • metadata – Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of metadata keys is not possible.

  • df_serializer – Optional Dataframe to Parquet serializer

Returns

datasets – DatasetMetadata for every dataset written.

Return type

Dict[str, kartothek.core.dataset.DatasetMetadata]

kartothek.io.eager_cube.build_cube(data: Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame], List[Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame]]]], cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, overwrite: bool = False, partition_on: Optional[Dict[str, Iterable[str]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None)Dict[str, kartothek.core.dataset.DatasetMetadata][source]

Store given dataframes as Ktk_cube cube.

data can be formatted in multiple ways:

  • single DataFrame:

    pd.DataFrame({
        'x': [0, 1, 2, 3],
        'p': [0, 0, 1, 1],
        'v': [42, 45, 20, 10],
    })
    

    In that case, the seed dataset will be written.

  • dictionary of DataFrames:

    {
        'seed': pd.DataFrame({
            'x': [0, 1, 2, 3],
            'p': [0, 0, 1, 1],
            'v1': [42, 45, 20, 10],
        }),
        'enrich': pd.DataFrame({
            'x': [0, 1, 2, 3],
            'p': [0, 0, 1, 1],
            'v2': [False, False, True, False],
        }),
    }
    

    In that case, multiple datasets can be written at the same time. Note that the seed dataset MUST be included.

  • list of anything above:

    [
        # seed data only
        pd.DataFrame({
            'x': [0, 1, 2, 3],
            'p': [0, 0, 1, 1],
            'v1': [42, 45, 20, 10],
        }),
        # seed data only, explicit way
        {
            'seed': pd.DataFrame({
                'x': [4, 5, 6, 7],
                'p': [0, 0, 1, 1],
                'v1': [12, 32, 22, 9],
            }),
        },
        # multiple datasets
        {
            'seed': pd.DataFrame({
                'x': [8, 9, 10, 11],
                'p': [0, 0, 1, 1],
                'v1': [9, 2, 4, 11],
            }),
            'enrich': pd.DataFrame({
                'x': [8, 9, 10, 11],
                'p': [0, 0, 1, 1],
                'v2': [True, True, False, False],
            }),
        },
        # non-seed data only
        {
            'enrich': pd.DataFrame({
                'x': [1, 2, 3, 4],
                'p': [0, 0, 1, 1],
                'v2': [False, True, False, False],
            }),
        },
    ]
    

    In that case, multiple datasets may be written. Note that at least a single list element must contain seed data.

Extra metdata may be preserved w/ every dataset, e.g.:

{
    'seed': {
        'source': 'db',
        'host': 'db1.cluster20.company.net',
        'last_event': '230c6edb-b69a-4d30-b56d-28f5dfe20948',
    },
    'enrich': {
        'source': 'python',
        'commit_hash': '8b5d717518439921e6d17c7495956bdad687bc54',
    },
}

Note that the given data must be JSON-serializable.

If the cube already exists, the overwrite flag must be given. In that case, all datasets that are part of the existing cube must be overwritten. Partial overwrites are not allowed.

Parameters
  • data – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.

  • cube – Cube specification.

  • store – Store to which the data should be written to.

  • metadata – Metadata for every dataset.

  • overwrite – If possibly existing datasets should be overwritten.

  • partition_on – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns).

  • df_serializer – Optional Dataframe to Parquet serializer

Returns

datasets – DatasetMetadata for every dataset written.

Return type

Dict[str, kartothek.core.dataset.DatasetMetadata]

kartothek.io.eager_cube.cleanup_cube(cube, store)[source]

Remove unused keys from cube datasets.

Important

All untracked keys which start with the cube’s uuid_prefix followed by the KTK_CUBE_UUID_SEPERATOR (e.g. my_cube_uuid++seed…) will be deleted by this routine. These keys may be leftovers from past overwrites or index updates.

Parameters
kartothek.io.eager_cube.collect_stats(cube, store, datasets=None)[source]

Collect statistics for given cube.

Parameters
Returns

stats – Statistics per ktk_cube dataset ID.

Return type

Dict[str, Dict[str, int]]

kartothek.io.eager_cube.copy_cube(cube: kartothek.core.cube.cube.Cube, src_store: Union[simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], tgt_store: Union[simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], overwrite: bool = False, datasets: Optional[Union[Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]] = None, renamed_cube_prefix: Optional[str] = None, renamed_datasets: Optional[Dict[str, str]] = None)[source]

Copy cube from one store to another.

Warning

A failing copy operation can not be rolled back if the overwrite flag is enabled and might leave the overwritten dataset in an inconsistent state.

Parameters
  • cube (Cube) – Cube specification.

  • src_store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – Source KV store.

  • tgt_store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – Target KV store.

  • overwrite (bool) – If possibly existing datasets in the target store should be overwritten.

  • datasets (Union[None, Iterable[str], Dict[str, DatasetMetadata]]) – Datasets to copy, must all be part of the cube. May be either the result of discover_datasets(), a list of Ktk_cube dataset ID or None (in which case entire cube will be copied).

  • renamed_cube_prefix (Optional[str]) – Optional new cube prefix. If specified, the cube will be renamed while copying.

  • renamed_datasets (Optional[Dict[str, str]]) – Optional dict with {old dataset name: new dataset name} entries. If provided, the datasets will be renamed accordingly during copying. When the parameter datasets is specified, the datasets to rename must be a subset of the datasets to copy.

kartothek.io.eager_cube.delete_cube(cube, store, datasets=None)[source]

Delete cube from store.

Important

This routine only deletes tracked files. Garbage and leftovers from old cubes and failed operations are NOT removed.

Parameters
kartothek.io.eager_cube.extend_cube(data: Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame], List[Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame]]]], cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, overwrite: bool = False, partition_on: Optional[Dict[str, Iterable[str]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None)Dict[str, kartothek.core.dataset.DatasetMetadata][source]

Store given dataframes into an existing Kartothek cube.

For details on data and metadata, see build_cube().

Parameters
  • data – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.

  • cube – Cube specification.

  • store – Store to which the data should be written to.

  • metadata – Metadata for every dataset.

  • overwrite – If possibly existing datasets should be overwritten.

  • partition_on – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns).

  • df_serializer – Optional Dataframe to Parquet serializer

Returns

datasets – DatasetMetadata for every dataset written.

Return type

Dict[str, kartothek.core.dataset.DatasetMetadata]

kartothek.io.eager_cube.query_cube(cube, store, conditions=None, datasets=None, dimension_columns=None, partition_by=None, payload_columns=None)[source]

Query cube.

Note

In case of partition_by=None (default case), only a single partition is generated. If this one will be empty (e.g. due to the provided conditions), an empty list will be returned, and a single-element list otherwise.

Parameters
  • cube (Cube) – Cube specification.

  • store (simplekv.KeyValueStore) – KV store that preserves the cube.

  • conditions (Union[None, Condition, Iterable[Condition], Conjunction]) – Conditions that should be applied, optional.

  • datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to query, must all be part of the cube. May be either the result of discover_datasets(), a list of Ktk_cube dataset ID or None (in which case auto-discovery will be used).

  • dimension_columns (Union[None, str, Iterable[str]]) – Dimension columns of the query, may result in projection. If not provided, dimension columns from cube specification will be used.

  • partition_by (Union[None, str, Iterable[str]]) – By which column logical partitions should be formed. If not provided, a single partition will be generated.

  • payload_columns (Union[None, str, Iterable[str]]) – Which columns apart from dimension_columns and partition_by should be returned.

Returns

dfs – List of non-empty DataFrames, order by partition_by. Column of DataFrames is alphabetically ordered. Data types are provided on best effort (they are restored based on the preserved data, but may be different due to Pandas NULL-handling, e.g. integer columns may be floats).

Return type

List[pandas.DataFrame]

kartothek.io.eager_cube.remove_partitions(cube: kartothek.core.cube.cube.Cube, store: Union[simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], conditions: Union[None, kartothek.core.cube.conditions.Condition, Sequence[kartothek.core.cube.conditions.Condition], kartothek.core.cube.conditions.Conjunction] = None, ktk_cube_dataset_ids: Optional[Union[Sequence[str], str]] = None, metadata: Optional[Dict[str, Dict[str, Any]]] = None)[source]

Remove given partition range from cube using a transaction.

Remove the partitions selected by conditions. If no conditions are given, remove all partitions. For each considered dataset, only the subset of conditions that refers to the partition columns of the respective dataset is used. In particular, a dataset that is not partitioned at all is always considered selected by conditions.

Parameters
  • cube – Cube spec.

  • store – Store.

  • conditions – Select the partitions to be removed. Must be a condition only on partition columns.

  • ktk_cube_dataset_ids – Ktk_cube dataset IDs to apply the remove action to, optional. Default to “all”.

  • metadata – Metadata for every the datasets, optional. Only given keys are updated/replaced. Deletion of metadata keys is not possible.

Returns

datasets – Datasets, updated.

Return type

Dict[str, kartothek.core.dataset.DatasetMetadata]