kartothek.io.eager_cube module¶
Eager IO aka “everything is done locally and immediately”.
-
kartothek.io.eager_cube.
append_to_cube
(data, cube, store, metadata=None)[source]¶ Append data to existing cube.
For details on
data
andmetadata
, seebuild_cube()
.Important
Physical partitions must be updated as a whole. If only single rows within a physical partition are updated, the old data is treated as “removed”.
Hint
To have better control over the overwrite “mask” (i.e. which partitions are overwritten), you should use
remove_partitions()
beforehand.Parameters: - data (Union[pd.DataFrame, Dict[str, pd.DataFrame], List[Union[pd.DataFrame, Dict[str, pd.DataFrame]]]]) – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.
- cube (kartothek.core.cube.cube.Cube) – Cube specification.
- store (simplekv.KeyValueStore) – Store to which the data should be written to.
- metadata (Optional[Dict[str, Dict[str, Any]]]) – Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of metadata keys is not possible.
Returns: datasets – DatasetMetadata for every dataset written.
Return type:
-
kartothek.io.eager_cube.
build_cube
(data, cube, store, metadata=None, overwrite=False, partition_on=None)[source]¶ Store given dataframes as Ktk_cube cube.
data
can be formatted in multiple ways:single DataFrame:
pd.DataFrame({ 'x': [0, 1, 2, 3], 'p': [0, 0, 1, 1], 'v': [42, 45, 20, 10], })
In that case, the seed dataset will be written.
dictionary of DataFrames:
{ 'seed': pd.DataFrame({ 'x': [0, 1, 2, 3], 'p': [0, 0, 1, 1], 'v1': [42, 45, 20, 10], }), 'enrich': pd.DataFrame({ 'x': [0, 1, 2, 3], 'p': [0, 0, 1, 1], 'v2': [False, False, True, False], }), }
In that case, multiple datasets can be written at the same time. Note that the seed dataset MUST be included.
list of anything above:
[ # seed data only pd.DataFrame({ 'x': [0, 1, 2, 3], 'p': [0, 0, 1, 1], 'v1': [42, 45, 20, 10], }), # seed data only, explicit way { 'seed': pd.DataFrame({ 'x': [4, 5, 6, 7], 'p': [0, 0, 1, 1], 'v1': [12, 32, 22, 9], }), }, # multiple datasets { 'seed': pd.DataFrame({ 'x': [8, 9, 10, 11], 'p': [0, 0, 1, 1], 'v1': [9, 2, 4, 11], }), 'enrich': pd.DataFrame({ 'x': [8, 9, 10, 11], 'p': [0, 0, 1, 1], 'v2': [True, True, False, False], }), }, # non-seed data only { 'enrich': pd.DataFrame({ 'x': [1, 2, 3, 4], 'p': [0, 0, 1, 1], 'v2': [False, True, False, False], }), }, ]
In that case, multiple datasets may be written. Note that at least a single list element must contain seed data.
Extra metdata may be preserved w/ every dataset, e.g.:
{ 'seed': { 'source': 'db', 'host': 'db1.cluster20.company.net', 'last_event': '230c6edb-b69a-4d30-b56d-28f5dfe20948', }, 'enrich': { 'source': 'python', 'commit_hash': '8b5d717518439921e6d17c7495956bdad687bc54', }, }
Note that the given data must be JSON-serializable.
If the cube already exists, the
overwrite
flag must be given. In that case, all datasets that are part of the existing cube must be overwritten. Partial overwrites are not allowed.Parameters: - data (Union[pd.DataFrame, Dict[str, pd.DataFrame], List[Union[pd.DataFrame, Dict[str, pd.DataFrame]]]]) – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.
- cube (kartothek.core.cube.cube.Cube) – Cube specification.
- store (simplekv.KeyValueStore) – Store to which the data should be written to.
- metadata (Optional[Dict[str, Dict[str, Any]]]) – Metadata for every dataset.
- overwrite (bool) – If possibly existing datasets should be overwritten.
- partition_on (Optional[Dict[str, Iterable[str]]]) – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns). See Dimensionality and Partitioning Details for details.
Returns: datasets – DatasetMetadata for every dataset written.
Return type:
-
kartothek.io.eager_cube.
cleanup_cube
(cube, store)[source]¶ Remove unused keys from cube datasets.
Important
All untracked keys which start with the cube’s uuid_prefix followed by the KTK_CUBE_UUID_SEPERATOR (e.g. my_cube_uuid++seed…) will be deleted by this routine. These keys may be leftovers from past overwrites or index updates.
Parameters: - cube (Cube) – Cube specification.
- store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – KV store.
-
kartothek.io.eager_cube.
collect_stats
(cube, store, datasets=None)[source]¶ Collect statistics for given cube.
Parameters: - cube (Cube) – Cube specification.
- store (simplekv.KeyValueStore) – KV store that preserves the cube.
- datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to query, must all be part of the cube. May be either the result of
discover_datasets()
, a list of Ktk_cube dataset ID orNone
(in which case auto-discovery will be used).
Returns: stats – Statistics per ktk_cube dataset ID.
Return type:
-
kartothek.io.eager_cube.
copy_cube
(cube, src_store, tgt_store, overwrite=False, datasets=None)[source]¶ Copy cube from one store to another.
Parameters: - cube (Cube) – Cube specification.
- src_store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – Source KV store.
- tgt_store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – Target KV store.
- overwrite (bool) – If possibly existing datasets in the target store should be overwritten.
- datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to copy, must all be part of the cube. May be either the result of
discover_datasets()
, a list of Ktk_cube dataset ID orNone
(in which case entire cube will be copied).
-
kartothek.io.eager_cube.
delete_cube
(cube, store, datasets=None)[source]¶ Delete cube from store.
Important
This routine only deletes tracked files. Garbage and leftovers from old cubes and failed operations are NOT removed.
Parameters: - cube (Cube) – Cube specification.
- store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – KV store.
- datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to delete, must all be part of the cube. May be either the result of
discover_datasets()
, a list of Ktk_cube dataset ID orNone
(in which case entire cube will be deleted).
-
kartothek.io.eager_cube.
extend_cube
(data, cube, store, metadata=None, overwrite=False, partition_on=None)[source]¶ Store given dataframes into an existing Kartothek cube.
For details on
data
andmetadata
, seebuild_cube()
.Parameters: - data (Union[pd.DataFrame, Dict[str, pd.DataFrame], List[Union[pd.DataFrame, Dict[str, pd.DataFrame]]]]) – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.
- cube (kartothek.core.cube.cube.Cube) – Cube specification.
- store (simplekv.KeyValueStore) – Store to which the data should be written to.
- metadata (Optional[Dict[str, Dict[str, Any]]]) – Metadata for every dataset.
- overwrite (bool) – If possibly existing datasets should be overwritten.
- partition_on (Optional[Dict[str, Iterable[str]]]) – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns). See Dimensionality and Partitioning Details for details.
Returns: datasets – DatasetMetadata for every dataset written.
Return type:
-
kartothek.io.eager_cube.
query_cube
(cube, store, conditions=None, datasets=None, dimension_columns=None, partition_by=None, payload_columns=None)[source]¶ Query cube.
Note
In case of
partition_by=None
(default case), only a single partition is generated. If this one will be empty (e.g. due to the provided conditions), an empty list will be returned, and a single-element list otherwise.Parameters: - cube (Cube) – Cube specification.
- store (simplekv.KeyValueStore) – KV store that preserves the cube.
- conditions (Union[None, Condition, Iterable[Condition], Conjunction]) – Conditions that should be applied, optional.
- datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to query, must all be part of the cube. May be either the result of
discover_datasets()
, a list of Ktk_cube dataset ID orNone
(in which case auto-discovery will be used). - dimension_columns (Union[None, str, Iterable[str]]) – Dimension columns of the query, may result in projection. If not provided, dimension columns from cube specification will be used.
- partition_by (Union[None, str, Iterable[str]]) – By which column logical partitions should be formed. If not provided, a single partition will be generated.
- payload_columns (Union[None, str, Iterable[str]]) – Which columns apart from
dimension_columns
andpartition_by
should be returned.
Returns: dfs – List of non-empty DataFrames, order by
partition_by
. Column of DataFrames is alphabetically ordered. Data types are provided on best effort (they are restored based on the preserved data, but may be different due to Pandas NULL-handling, e.g. integer columns may be floats).Return type:
-
kartothek.io.eager_cube.
remove_partitions
(cube, store, conditions=None, ktk_cube_dataset_ids=None, metadata=None)[source]¶ Remove given partition range from cube using a transaction.
Remove the partitions selected by
conditions
. If noconditions
are given, remove all partitions. For each considered dataset, only the subset ofconditions
that refers to the partition columns of the respective dataset is used. In particular, a dataset that is not partitioned at all is always considered selected byconditions
.Parameters: - cube (kartothek.core.cube.cube.Cube) – Cube spec.
- store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – Store.
- conditions (Union[None, Condition, Iterable[Condition], Conjunction]) – Select the partitions to be removed. Must be a condition only on partition columns.
- ktk_cube_dataset_ids (Optional[Union[Iterable[Union[Str, Bytes]], Union[Str, Bytes]]]) – Ktk_cube dataset IDs to apply the remove action to, optional. Default to “all”.
- metadata (Optional[Dict[str, Dict[str, Any]]]) – Metadata for every the datasets, optional. Only given keys are updated/replaced. Deletion of metadata keys is not possible.
Returns: datasets – Datasets, updated.
Return type: