kartothek.io.eager_cube module

Eager IO aka “everything is done locally and immediately”.

kartothek.io.eager_cube.append_to_cube(data, cube, store, metadata=None)[source]

Append data to existing cube.

For details on data and metadata, see build_cube().

Important

Physical partitions must be updated as a whole. If only single rows within a physical partition are updated, the old data is treated as “removed”.

Hint

To have better control over the overwrite “mask” (i.e. which partitions are overwritten), you should use remove_partitions() beforehand.

Parameters:
Returns:

datasets – DatasetMetadata for every dataset written.

Return type:

Dict[str, kartothek.core.dataset.DatasetMetadata]

kartothek.io.eager_cube.build_cube(data, cube, store, metadata=None, overwrite=False, partition_on=None)[source]

Store given dataframes as Ktk_cube cube.

data can be formatted in multiple ways:

  • single DataFrame:

    pd.DataFrame({
        'x': [0, 1, 2, 3],
        'p': [0, 0, 1, 1],
        'v': [42, 45, 20, 10],
    })
    

    In that case, the seed dataset will be written.

  • dictionary of DataFrames:

    {
        'seed': pd.DataFrame({
            'x': [0, 1, 2, 3],
            'p': [0, 0, 1, 1],
            'v1': [42, 45, 20, 10],
        }),
        'enrich': pd.DataFrame({
            'x': [0, 1, 2, 3],
            'p': [0, 0, 1, 1],
            'v2': [False, False, True, False],
        }),
    }
    

    In that case, multiple datasets can be written at the same time. Note that the seed dataset MUST be included.

  • list of anything above:

    [
        # seed data only
        pd.DataFrame({
            'x': [0, 1, 2, 3],
            'p': [0, 0, 1, 1],
            'v1': [42, 45, 20, 10],
        }),
        # seed data only, explicit way
        {
            'seed': pd.DataFrame({
                'x': [4, 5, 6, 7],
                'p': [0, 0, 1, 1],
                'v1': [12, 32, 22, 9],
            }),
        },
        # multiple datasets
        {
            'seed': pd.DataFrame({
                'x': [8, 9, 10, 11],
                'p': [0, 0, 1, 1],
                'v1': [9, 2, 4, 11],
            }),
            'enrich': pd.DataFrame({
                'x': [8, 9, 10, 11],
                'p': [0, 0, 1, 1],
                'v2': [True, True, False, False],
            }),
        },
        # non-seed data only
        {
            'enrich': pd.DataFrame({
                'x': [1, 2, 3, 4],
                'p': [0, 0, 1, 1],
                'v2': [False, True, False, False],
            }),
        },
    ]
    

    In that case, multiple datasets may be written. Note that at least a single list element must contain seed data.

Extra metdata may be preserved w/ every dataset, e.g.:

{
    'seed': {
        'source': 'db',
        'host': 'db1.cluster20.company.net',
        'last_event': '230c6edb-b69a-4d30-b56d-28f5dfe20948',
    },
    'enrich': {
        'source': 'python',
        'commit_hash': '8b5d717518439921e6d17c7495956bdad687bc54',
    },
}

Note that the given data must be JSON-serializable.

If the cube already exists, the overwrite flag must be given. In that case, all datasets that are part of the existing cube must be overwritten. Partial overwrites are not allowed.

Parameters:
Returns:

datasets – DatasetMetadata for every dataset written.

Return type:

Dict[str, kartothek.core.dataset.DatasetMetadata]

kartothek.io.eager_cube.cleanup_cube(cube, store)[source]

Remove unused keys from cube datasets.

Important

All untracked keys which start with the cube’s uuid_prefix followed by the KTK_CUBE_UUID_SEPERATOR (e.g. my_cube_uuid++seed…) will be deleted by this routine. These keys may be leftovers from past overwrites or index updates.

Parameters:
kartothek.io.eager_cube.collect_stats(cube, store, datasets=None)[source]

Collect statistics for given cube.

Parameters:
Returns:

stats – Statistics per ktk_cube dataset ID.

Return type:

Dict[str, Dict[str, int]]

kartothek.io.eager_cube.copy_cube(cube, src_store, tgt_store, overwrite=False, datasets=None)[source]

Copy cube from one store to another.

Parameters:
kartothek.io.eager_cube.delete_cube(cube, store, datasets=None)[source]

Delete cube from store.

Important

This routine only deletes tracked files. Garbage and leftovers from old cubes and failed operations are NOT removed.

Parameters:
kartothek.io.eager_cube.extend_cube(data, cube, store, metadata=None, overwrite=False, partition_on=None)[source]

Store given dataframes into an existing Kartothek cube.

For details on data and metadata, see build_cube().

Parameters:
Returns:

datasets – DatasetMetadata for every dataset written.

Return type:

Dict[str, kartothek.core.dataset.DatasetMetadata]

kartothek.io.eager_cube.query_cube(cube, store, conditions=None, datasets=None, dimension_columns=None, partition_by=None, payload_columns=None)[source]

Query cube.

Note

In case of partition_by=None (default case), only a single partition is generated. If this one will be empty (e.g. due to the provided conditions), an empty list will be returned, and a single-element list otherwise.

Parameters:
Returns:

dfs – List of non-empty DataFrames, order by partition_by. Column of DataFrames is alphabetically ordered. Data types are provided on best effort (they are restored based on the preserved data, but may be different due to Pandas NULL-handling, e.g. integer columns may be floats).

Return type:

List[pandas.DataFrame]

kartothek.io.eager_cube.remove_partitions(cube, store, conditions=None, ktk_cube_dataset_ids=None, metadata=None)[source]

Remove given partition range from cube using a transaction.

Remove the partitions selected by conditions. If no conditions are given, remove all partitions. For each considered dataset, only the subset of conditions that refers to the partition columns of the respective dataset is used. In particular, a dataset that is not partitioned at all is always considered selected by conditions.

Parameters:
Returns:

datasets – Datasets, updated.

Return type:

Dict[str, kartothek.core.dataset.DatasetMetadata]