kartothek.io.eager_cube module¶
Eager IO aka “everything is done locally and immediately”.
-
kartothek.io.eager_cube.append_to_cube(data: Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame], List[Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame]]]], cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None) → Dict[str, kartothek.core.dataset.DatasetMetadata][source]¶ Append data to existing cube.
For details on
dataandmetadata, seebuild_cube().Important
Physical partitions must be updated as a whole. If only single rows within a physical partition are updated, the old data is treated as “removed”.
Hint
To have better control over the overwrite “mask” (i.e. which partitions are overwritten), you should use
remove_partitions()beforehand.- Parameters
data – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.
cube – Cube specification.
store – Store to which the data should be written to.
metadata – Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of metadata keys is not possible.
df_serializer – Optional Dataframe to Parquet serializer
- Returns
datasets – DatasetMetadata for every dataset written.
- Return type
-
kartothek.io.eager_cube.build_cube(data: Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame], List[Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame]]]], cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, overwrite: bool = False, partition_on: Optional[Dict[str, Iterable[str]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None) → Dict[str, kartothek.core.dataset.DatasetMetadata][source]¶ Store given dataframes as Ktk_cube cube.
datacan be formatted in multiple ways:single DataFrame:
pd.DataFrame({ 'x': [0, 1, 2, 3], 'p': [0, 0, 1, 1], 'v': [42, 45, 20, 10], })
In that case, the seed dataset will be written.
dictionary of DataFrames:
{ 'seed': pd.DataFrame({ 'x': [0, 1, 2, 3], 'p': [0, 0, 1, 1], 'v1': [42, 45, 20, 10], }), 'enrich': pd.DataFrame({ 'x': [0, 1, 2, 3], 'p': [0, 0, 1, 1], 'v2': [False, False, True, False], }), }
In that case, multiple datasets can be written at the same time. Note that the seed dataset MUST be included.
list of anything above:
[ # seed data only pd.DataFrame({ 'x': [0, 1, 2, 3], 'p': [0, 0, 1, 1], 'v1': [42, 45, 20, 10], }), # seed data only, explicit way { 'seed': pd.DataFrame({ 'x': [4, 5, 6, 7], 'p': [0, 0, 1, 1], 'v1': [12, 32, 22, 9], }), }, # multiple datasets { 'seed': pd.DataFrame({ 'x': [8, 9, 10, 11], 'p': [0, 0, 1, 1], 'v1': [9, 2, 4, 11], }), 'enrich': pd.DataFrame({ 'x': [8, 9, 10, 11], 'p': [0, 0, 1, 1], 'v2': [True, True, False, False], }), }, # non-seed data only { 'enrich': pd.DataFrame({ 'x': [1, 2, 3, 4], 'p': [0, 0, 1, 1], 'v2': [False, True, False, False], }), }, ]
In that case, multiple datasets may be written. Note that at least a single list element must contain seed data.
Extra metdata may be preserved w/ every dataset, e.g.:
{ 'seed': { 'source': 'db', 'host': 'db1.cluster20.company.net', 'last_event': '230c6edb-b69a-4d30-b56d-28f5dfe20948', }, 'enrich': { 'source': 'python', 'commit_hash': '8b5d717518439921e6d17c7495956bdad687bc54', }, }
Note that the given data must be JSON-serializable.
If the cube already exists, the
overwriteflag must be given. In that case, all datasets that are part of the existing cube must be overwritten. Partial overwrites are not allowed.- Parameters
data – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.
cube – Cube specification.
store – Store to which the data should be written to.
metadata – Metadata for every dataset.
overwrite – If possibly existing datasets should be overwritten.
partition_on – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns).
df_serializer – Optional Dataframe to Parquet serializer
- Returns
datasets – DatasetMetadata for every dataset written.
- Return type
-
kartothek.io.eager_cube.cleanup_cube(cube, store)[source]¶ Remove unused keys from cube datasets.
Important
All untracked keys which start with the cube’s uuid_prefix followed by the KTK_CUBE_UUID_SEPERATOR (e.g. my_cube_uuid++seed…) will be deleted by this routine. These keys may be leftovers from past overwrites or index updates.
- Parameters
cube (Cube) – Cube specification.
store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – KV store.
-
kartothek.io.eager_cube.collect_stats(cube, store, datasets=None)[source]¶ Collect statistics for given cube.
- Parameters
cube (Cube) – Cube specification.
store (simplekv.KeyValueStore) – KV store that preserves the cube.
datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to query, must all be part of the cube. May be either the result of
discover_datasets(), a list of Ktk_cube dataset ID orNone(in which case auto-discovery will be used).
- Returns
stats – Statistics per ktk_cube dataset ID.
- Return type
-
kartothek.io.eager_cube.copy_cube(cube: kartothek.core.cube.cube.Cube, src_store: Union[simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], tgt_store: Union[simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], overwrite: bool = False, datasets: Optional[Union[Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]] = None, renamed_cube_prefix: Optional[str] = None, renamed_datasets: Optional[Dict[str, str]] = None)[source]¶ Copy cube from one store to another.
Warning
A failing copy operation can not be rolled back if the overwrite flag is enabled and might leave the overwritten dataset in an inconsistent state.
- Parameters
cube (Cube) – Cube specification.
src_store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – Source KV store.
tgt_store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – Target KV store.
overwrite (bool) – If possibly existing datasets in the target store should be overwritten.
datasets (Union[None, Iterable[str], Dict[str, DatasetMetadata]]) – Datasets to copy, must all be part of the cube. May be either the result of
discover_datasets(), a list of Ktk_cube dataset ID orNone(in which case entire cube will be copied).renamed_cube_prefix (Optional[str]) – Optional new cube prefix. If specified, the cube will be renamed while copying.
renamed_datasets (Optional[Dict[str, str]]) – Optional dict with {old dataset name: new dataset name} entries. If provided, the datasets will be renamed accordingly during copying. When the parameter datasets is specified, the datasets to rename must be a subset of the datasets to copy.
-
kartothek.io.eager_cube.delete_cube(cube, store, datasets=None)[source]¶ Delete cube from store.
Important
This routine only deletes tracked files. Garbage and leftovers from old cubes and failed operations are NOT removed.
- Parameters
cube (Cube) – Cube specification.
store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – KV store.
datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to delete, must all be part of the cube. May be either the result of
discover_datasets(), a list of Ktk_cube dataset ID orNone(in which case entire cube will be deleted).
-
kartothek.io.eager_cube.extend_cube(data: Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame], List[Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame]]]], cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, overwrite: bool = False, partition_on: Optional[Dict[str, Iterable[str]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None) → Dict[str, kartothek.core.dataset.DatasetMetadata][source]¶ Store given dataframes into an existing Kartothek cube.
For details on
dataandmetadata, seebuild_cube().- Parameters
data – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.
cube – Cube specification.
store – Store to which the data should be written to.
metadata – Metadata for every dataset.
overwrite – If possibly existing datasets should be overwritten.
partition_on – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns).
df_serializer – Optional Dataframe to Parquet serializer
- Returns
datasets – DatasetMetadata for every dataset written.
- Return type
-
kartothek.io.eager_cube.query_cube(cube, store, conditions=None, datasets=None, dimension_columns=None, partition_by=None, payload_columns=None)[source]¶ Query cube.
Note
In case of
partition_by=None(default case), only a single partition is generated. If this one will be empty (e.g. due to the provided conditions), an empty list will be returned, and a single-element list otherwise.- Parameters
cube (Cube) – Cube specification.
store (simplekv.KeyValueStore) – KV store that preserves the cube.
conditions (Union[None, Condition, Iterable[Condition], Conjunction]) – Conditions that should be applied, optional.
datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to query, must all be part of the cube. May be either the result of
discover_datasets(), a list of Ktk_cube dataset ID orNone(in which case auto-discovery will be used).dimension_columns (Union[None, str, Iterable[str]]) – Dimension columns of the query, may result in projection. If not provided, dimension columns from cube specification will be used.
partition_by (Union[None, str, Iterable[str]]) – By which column logical partitions should be formed. If not provided, a single partition will be generated.
payload_columns (Union[None, str, Iterable[str]]) – Which columns apart from
dimension_columnsandpartition_byshould be returned.
- Returns
dfs – List of non-empty DataFrames, order by
partition_by. Column of DataFrames is alphabetically ordered. Data types are provided on best effort (they are restored based on the preserved data, but may be different due to Pandas NULL-handling, e.g. integer columns may be floats).- Return type
List[pandas.DataFrame]
-
kartothek.io.eager_cube.remove_partitions(cube: kartothek.core.cube.cube.Cube, store: Union[simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], conditions: Union[None, kartothek.core.cube.conditions.Condition, Sequence[kartothek.core.cube.conditions.Condition], kartothek.core.cube.conditions.Conjunction] = None, ktk_cube_dataset_ids: Optional[Union[Sequence[str], str]] = None, metadata: Optional[Dict[str, Dict[str, Any]]] = None)[source]¶ Remove given partition range from cube using a transaction.
Remove the partitions selected by
conditions. If noconditionsare given, remove all partitions. For each considered dataset, only the subset ofconditionsthat refers to the partition columns of the respective dataset is used. In particular, a dataset that is not partitioned at all is always considered selected byconditions.- Parameters
cube – Cube spec.
store – Store.
conditions – Select the partitions to be removed. Must be a condition only on partition columns.
ktk_cube_dataset_ids – Ktk_cube dataset IDs to apply the remove action to, optional. Default to “all”.
metadata – Metadata for every the datasets, optional. Only given keys are updated/replaced. Deletion of metadata keys is not possible.
- Returns
datasets – Datasets, updated.
- Return type