kartothek.io.dask.bag_cube module¶
Dask.Bag IO.
-
kartothek.io.dask.bag_cube.
append_to_cube_from_bag
(data: dask.bag.core.Bag, cube: kartothek.core.cube.cube.Cube, store: Callable[], simplekv.KeyValueStore], ktk_cube_dataset_ids: Optional[Iterable[str]], metadata: Optional[Dict[str, Dict[str, Any]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None) → dask.bag.core.Bag[source]¶ Append data to existing cube.
For details on
data
andmetadata
, seebuild_cube()
.Important
Physical partitions must be updated as a whole. If only single rows within a physical partition are updated, the old data is treated as “removed”.
Hint
To have better control over the overwrite “mask” (i.e. which partitions are overwritten), you should use
remove_partitions()
beforehand or useupdate_cube_from_bag()
instead.- Parameters
data (dask.bag.Bag) – Bag containing dataframes
cube – Cube specification.
store – Store to which the data should be written to.
ktk_cube_dataset_ids – Datasets that will be written, must be specified in advance.
metadata – Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of metadata keys is not possible.
df_serializer – Optional Dataframe to Parquet serializer
- Returns
metadata_dict – A dask bag object containing the compute graph to append to the cube returning the dict of dataset metadata objects. The bag has a single partition with a single element.
- Return type
-
kartothek.io.dask.bag_cube.
build_cube_from_bag
(data: dask.bag.core.Bag, cube: kartothek.core.cube.cube.Cube, store: Callable[], simplekv.KeyValueStore], ktk_cube_dataset_ids: Optional[Iterable[str]] = None, metadata: Optional[Dict[str, Dict[str, Any]]] = None, overwrite: bool = False, partition_on: Optional[Dict[str, Iterable[str]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None) → dask.bag.core.Bag[source]¶ Create dask computation graph that builds a cube with the data supplied from a dask bag.
- Parameters
data (dask.bag.Bag) – Bag containing dataframes
cube – Cube specification.
store – Store to which the data should be written to.
ktk_cube_dataset_ids – Datasets that will be written, must be specified in advance. If left unprovided, it is assumed that only the seed dataset will be written.
metadata – Metadata for every dataset.
overwrite – If possibly existing datasets should be overwritten.
partition_on – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns).
df_serializer – Optional Dataframe to Parquet serializer
- Returns
metadata_dict – A dask bag object containing the compute graph to build a cube returning the dict of dataset metadata objects. The bag has a single partition with a single element.
- Return type
-
kartothek.io.dask.bag_cube.
cleanup_cube_bag
(cube: kartothek.core.cube.cube.Cube, store: Callable[], simplekv.KeyValueStore], blocksize: int = 100) → dask.bag.core.Bag[source]¶ Remove unused keys from cube datasets.
Important
All untracked keys which start with the cube’s uuid_prefix followed by the KTK_CUBE_UUID_SEPERATOR (e.g. my_cube_uuid++seed…) will be deleted by this routine. These keys may be leftovers from past overwrites or index updates.
- Parameters
cube – Cube specification.
store – KV store.
blocksize – Number of keys to delete at once.
- Returns
bag – A dask bag that performs the given operation. May contain multiple partitions.
- Return type
-
kartothek.io.dask.bag_cube.
collect_stats_bag
(cube: kartothek.core.cube.cube.Cube, store: Callable[], simplekv.KeyValueStore], datasets: Optional[Union[Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]] = None, blocksize: int = 100)[source]¶ Collect statistics for given cube.
- Parameters
cube – Cube specification.
store – KV store that preserves the cube.
datasets – Datasets to query, must all be part of the cube. May be either the result of
discover_datasets()
, a list of Ktk_cube dataset ID orNone
(in which case auto-discovery will be used).blocksize – Number of partitions to scan at once.
- Returns
bag – A dask bag that returns a single result of the form
Dict[str, Dict[str, int]]
and contains statistics per ktk_cube dataset ID.- Return type
-
kartothek.io.dask.bag_cube.
copy_cube_bag
(cube, src_store: Callable[], simplekv.KeyValueStore], tgt_store: Callable[], simplekv.KeyValueStore], blocksize: int = 100, overwrite: bool = False, datasets: Optional[Union[Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]] = None)[source]¶ Copy cube from one store to another.
- Parameters
cube – Cube specification.
src_store – Source KV store.
tgt_store – Target KV store.
overwrite – If possibly existing datasets in the target store should be overwritten.
blocksize – Number of keys to copy at once.
datasets – Datasets to copy, must all be part of the cube. May be either the result of
discover_datasets()
, a list of Ktk_cube dataset ID orNone
(in which case entire cube will be copied).
- Returns
bag – A dask bag that performs the given operation. May contain multiple partitions.
- Return type
-
kartothek.io.dask.bag_cube.
delete_cube_bag
(cube: kartothek.core.cube.cube.Cube, store: Callable[], simplekv.KeyValueStore], blocksize: int = 100, datasets: Optional[Union[Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]] = None)[source]¶ Delete cube from store.
Important
This routine only deletes tracked files. Garbage and leftovers from old cubes and failed operations are NOT removed.
- Parameters
cube – Cube specification.
store – KV store.
blocksize – Number of keys to delete at once.
datasets – Datasets to delete, must all be part of the cube. May be either the result of
discover_datasets()
, a list of Ktk_cube dataset ID orNone
(in which case entire cube will be deleted).
- Returns
bag – A dask bag that performs the given operation. May contain multiple partitions.
- Return type
-
kartothek.io.dask.bag_cube.
extend_cube_from_bag
(data: dask.bag.core.Bag, cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, ktk_cube_dataset_ids: Optional[Iterable[str]], metadata: Optional[Dict[str, Dict[str, Any]]] = None, overwrite: bool = False, partition_on: Optional[Dict[str, Iterable[str]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None) → dask.bag.core.Bag[source]¶ Create dask computation graph that extends a cube by the data supplied from a dask bag.
For details on
data
andmetadata
, seebuild_cube()
.- Parameters
data (dask.bag.Bag) – Bag containing dataframes (see
build_cube()
for possible format and types).cube (kartothek.core.cube.cube.Cube) – Cube specification.
store – Store to which the data should be written to.
ktk_cube_dataset_ids – Datasets that will be written, must be specified in advance.
metadata – Metadata for every dataset.
overwrite – If possibly existing datasets should be overwritten.
partition_on – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns).
df_serializer – Optional Dataframe to Parquet serializer
- Returns
metadata_dict – A dask bag object containing the compute graph to extend a cube returning the dict of dataset metadata objects. The bag has a single partition with a single element.
- Return type
-
kartothek.io.dask.bag_cube.
query_cube_bag
(cube, store, conditions=None, datasets=None, dimension_columns=None, partition_by=None, payload_columns=None, blocksize=1)[source]¶ Query cube.
For detailed documentation, see
query_cube()
.- Parameters
cube (Cube) – Cube specification.
store (simplekv.KeyValueStore) – KV store that preserves the cube.
conditions (Union[None, Condition, Iterable[Condition], Conjunction]) – Conditions that should be applied, optional.
datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to query, must all be part of the cube. May be either the result of
discover_datasets()
, an iterable of Ktk_cube dataset ID orNone
(in which case auto-discovery will be used).dimension_columns (Union[None, str, Iterable[str]]) – Dimension columns of the query, may result in projection. If not provided, dimension columns from cube specification will be used.
partition_by (Union[None, str, Iterable[str]]) – By which column logical partitions should be formed. If not provided, a single partition will be generated.
payload_columns (Union[None, str, Iterable[str]]) – Which columns apart from
dimension_columns
andpartition_by
should be returned.blocksize (int) – Partition size of the bag.
- Returns
bag – Bag of 1-sized partitions of non-empty DataFrames, order by
partition_by
. Column of DataFrames is alphabetically ordered. Data types are provided on best effort (they are restored based on the preserved data, but may be different due to Pandas NULL-handling, e.g. integer columns may be floats).- Return type
-
kartothek.io.dask.bag_cube.
update_cube_from_bag
(data: dask.bag.core.Bag, cube: kartothek.core.cube.cube.Cube, store: Callable[], simplekv.KeyValueStore], remove_conditions, ktk_cube_dataset_ids: Optional[Iterable[str]], metadata: Optional[Dict[str, Dict[str, Any]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None) → dask.bag.core.Bag[source]¶ Remove partitions and append data to existing cube.
For details on
data
andmetadata
, seebuild_cube()
.Only datasets in ktk_cube_dataset_ids will be affected.
- Parameters
data (dask.bag.Bag) – Bag containing dataframes
cube – Cube specification.
store – Store to which the data should be written to.
remove_conditions – Conditions that select the partitions to remove. Must be a condition that only uses partition columns.
ktk_cube_dataset_ids – Datasets that will be written, must be specified in advance.
metadata – Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of metadata keys is not possible.
df_serializer – Optional Dataframe to Parquet serializer
- Returns
metadata_dict – A dask bag object containing the compute graph to append to the cube returning the dict of dataset metadata objects. The bag has a single partition with a single element.
- Return type
See also