kartothek.io.eager_cube module¶
Eager IO aka “everything is done locally and immediately”.
-
kartothek.io.eager_cube.
append_to_cube
(data: Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame], List[Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame]]]], cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None) → Dict[str, kartothek.core.dataset.DatasetMetadata][source]¶ Append data to existing cube.
For details on
data
andmetadata
, seebuild_cube()
.Important
Physical partitions must be updated as a whole. If only single rows within a physical partition are updated, the old data is treated as “removed”.
Hint
To have better control over the overwrite “mask” (i.e. which partitions are overwritten), you should use
remove_partitions()
beforehand.- Parameters
data – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.
cube – Cube specification.
store – Store to which the data should be written to.
metadata – Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of metadata keys is not possible.
df_serializer – Optional Dataframe to Parquet serializer
- Returns
datasets – DatasetMetadata for every dataset written.
- Return type
-
kartothek.io.eager_cube.
build_cube
(data: Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame], List[Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame]]]], cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, overwrite: bool = False, partition_on: Optional[Dict[str, Iterable[str]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None) → Dict[str, kartothek.core.dataset.DatasetMetadata][source]¶ Store given dataframes as Ktk_cube cube.
data
can be formatted in multiple ways:single DataFrame:
pd.DataFrame({ 'x': [0, 1, 2, 3], 'p': [0, 0, 1, 1], 'v': [42, 45, 20, 10], })
In that case, the seed dataset will be written.
dictionary of DataFrames:
{ 'seed': pd.DataFrame({ 'x': [0, 1, 2, 3], 'p': [0, 0, 1, 1], 'v1': [42, 45, 20, 10], }), 'enrich': pd.DataFrame({ 'x': [0, 1, 2, 3], 'p': [0, 0, 1, 1], 'v2': [False, False, True, False], }), }
In that case, multiple datasets can be written at the same time. Note that the seed dataset MUST be included.
list of anything above:
[ # seed data only pd.DataFrame({ 'x': [0, 1, 2, 3], 'p': [0, 0, 1, 1], 'v1': [42, 45, 20, 10], }), # seed data only, explicit way { 'seed': pd.DataFrame({ 'x': [4, 5, 6, 7], 'p': [0, 0, 1, 1], 'v1': [12, 32, 22, 9], }), }, # multiple datasets { 'seed': pd.DataFrame({ 'x': [8, 9, 10, 11], 'p': [0, 0, 1, 1], 'v1': [9, 2, 4, 11], }), 'enrich': pd.DataFrame({ 'x': [8, 9, 10, 11], 'p': [0, 0, 1, 1], 'v2': [True, True, False, False], }), }, # non-seed data only { 'enrich': pd.DataFrame({ 'x': [1, 2, 3, 4], 'p': [0, 0, 1, 1], 'v2': [False, True, False, False], }), }, ]
In that case, multiple datasets may be written. Note that at least a single list element must contain seed data.
Extra metdata may be preserved w/ every dataset, e.g.:
{ 'seed': { 'source': 'db', 'host': 'db1.cluster20.company.net', 'last_event': '230c6edb-b69a-4d30-b56d-28f5dfe20948', }, 'enrich': { 'source': 'python', 'commit_hash': '8b5d717518439921e6d17c7495956bdad687bc54', }, }
Note that the given data must be JSON-serializable.
If the cube already exists, the
overwrite
flag must be given. In that case, all datasets that are part of the existing cube must be overwritten. Partial overwrites are not allowed.- Parameters
data – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.
cube – Cube specification.
store – Store to which the data should be written to.
metadata – Metadata for every dataset.
overwrite – If possibly existing datasets should be overwritten.
partition_on – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns).
df_serializer – Optional Dataframe to Parquet serializer
- Returns
datasets – DatasetMetadata for every dataset written.
- Return type
-
kartothek.io.eager_cube.
cleanup_cube
(cube, store)[source]¶ Remove unused keys from cube datasets.
Important
All untracked keys which start with the cube’s uuid_prefix followed by the KTK_CUBE_UUID_SEPERATOR (e.g. my_cube_uuid++seed…) will be deleted by this routine. These keys may be leftovers from past overwrites or index updates.
- Parameters
cube (Cube) – Cube specification.
store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – KV store.
-
kartothek.io.eager_cube.
collect_stats
(cube, store, datasets=None)[source]¶ Collect statistics for given cube.
- Parameters
cube (Cube) – Cube specification.
store (simplekv.KeyValueStore) – KV store that preserves the cube.
datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to query, must all be part of the cube. May be either the result of
discover_datasets()
, a list of Ktk_cube dataset ID orNone
(in which case auto-discovery will be used).
- Returns
stats – Statistics per ktk_cube dataset ID.
- Return type
-
kartothek.io.eager_cube.
copy_cube
(cube: kartothek.core.cube.cube.Cube, src_store: Union[simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], tgt_store: Union[simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], overwrite: bool = False, datasets: Optional[Union[Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]] = None, renamed_cube_prefix: Optional[str] = None, renamed_datasets: Optional[Dict[str, str]] = None)[source]¶ Copy cube from one store to another.
Warning
A failing copy operation can not be rolled back if the overwrite flag is enabled and might leave the overwritten dataset in an inconsistent state.
- Parameters
cube (Cube) – Cube specification.
src_store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – Source KV store.
tgt_store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – Target KV store.
overwrite (bool) – If possibly existing datasets in the target store should be overwritten.
datasets (Union[None, Iterable[str], Dict[str, DatasetMetadata]]) – Datasets to copy, must all be part of the cube. May be either the result of
discover_datasets()
, a list of Ktk_cube dataset ID orNone
(in which case entire cube will be copied).renamed_cube_prefix (Optional[str]) – Optional new cube prefix. If specified, the cube will be renamed while copying.
renamed_datasets (Optional[Dict[str, str]]) – Optional dict with {old dataset name: new dataset name} entries. If provided, the datasets will be renamed accordingly during copying. When the parameter datasets is specified, the datasets to rename must be a subset of the datasets to copy.
-
kartothek.io.eager_cube.
delete_cube
(cube, store, datasets=None)[source]¶ Delete cube from store.
Important
This routine only deletes tracked files. Garbage and leftovers from old cubes and failed operations are NOT removed.
- Parameters
cube (Cube) – Cube specification.
store (Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]) – KV store.
datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to delete, must all be part of the cube. May be either the result of
discover_datasets()
, a list of Ktk_cube dataset ID orNone
(in which case entire cube will be deleted).
-
kartothek.io.eager_cube.
extend_cube
(data: Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame], List[Union[pandas.core.frame.DataFrame, Dict[str, pandas.core.frame.DataFrame]]]], cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, overwrite: bool = False, partition_on: Optional[Dict[str, Iterable[str]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None) → Dict[str, kartothek.core.dataset.DatasetMetadata][source]¶ Store given dataframes into an existing Kartothek cube.
For details on
data
andmetadata
, seebuild_cube()
.- Parameters
data – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.
cube – Cube specification.
store – Store to which the data should be written to.
metadata – Metadata for every dataset.
overwrite – If possibly existing datasets should be overwritten.
partition_on – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns).
df_serializer – Optional Dataframe to Parquet serializer
- Returns
datasets – DatasetMetadata for every dataset written.
- Return type
-
kartothek.io.eager_cube.
query_cube
(cube, store, conditions=None, datasets=None, dimension_columns=None, partition_by=None, payload_columns=None)[source]¶ Query cube.
Note
In case of
partition_by=None
(default case), only a single partition is generated. If this one will be empty (e.g. due to the provided conditions), an empty list will be returned, and a single-element list otherwise.- Parameters
cube (Cube) – Cube specification.
store (simplekv.KeyValueStore) – KV store that preserves the cube.
conditions (Union[None, Condition, Iterable[Condition], Conjunction]) – Conditions that should be applied, optional.
datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to query, must all be part of the cube. May be either the result of
discover_datasets()
, a list of Ktk_cube dataset ID orNone
(in which case auto-discovery will be used).dimension_columns (Union[None, str, Iterable[str]]) – Dimension columns of the query, may result in projection. If not provided, dimension columns from cube specification will be used.
partition_by (Union[None, str, Iterable[str]]) – By which column logical partitions should be formed. If not provided, a single partition will be generated.
payload_columns (Union[None, str, Iterable[str]]) – Which columns apart from
dimension_columns
andpartition_by
should be returned.
- Returns
dfs – List of non-empty DataFrames, order by
partition_by
. Column of DataFrames is alphabetically ordered. Data types are provided on best effort (they are restored based on the preserved data, but may be different due to Pandas NULL-handling, e.g. integer columns may be floats).- Return type
List[pandas.DataFrame]
-
kartothek.io.eager_cube.
remove_partitions
(cube: kartothek.core.cube.cube.Cube, store: Union[simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], conditions: Union[None, kartothek.core.cube.conditions.Condition, Sequence[kartothek.core.cube.conditions.Condition], kartothek.core.cube.conditions.Conjunction] = None, ktk_cube_dataset_ids: Optional[Union[Sequence[str], str]] = None, metadata: Optional[Dict[str, Dict[str, Any]]] = None)[source]¶ Remove given partition range from cube using a transaction.
Remove the partitions selected by
conditions
. If noconditions
are given, remove all partitions. For each considered dataset, only the subset ofconditions
that refers to the partition columns of the respective dataset is used. In particular, a dataset that is not partitioned at all is always considered selected byconditions
.- Parameters
cube – Cube spec.
store – Store.
conditions – Select the partitions to be removed. Must be a condition only on partition columns.
ktk_cube_dataset_ids – Ktk_cube dataset IDs to apply the remove action to, optional. Default to “all”.
metadata – Metadata for every the datasets, optional. Only given keys are updated/replaced. Deletion of metadata keys is not possible.
- Returns
datasets – Datasets, updated.
- Return type