kartothek.io.dask.common_cube module

Common code for dask backends.

kartothek.io.dask.common_cube.append_to_cube_from_bag_internal(data: dask.bag.core.Bag, cube: kartothek.core.cube.cube.Cube, store: Callable[], simplekv.KeyValueStore], ktk_cube_dataset_ids: Optional[Iterable[str]], metadata: Optional[Dict[str, Dict[str, Any]]], remove_conditions=None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None)dask.bag.core.Bag[source]

Append data to existing cube.

For details on data and metadata, see build_cube().

Important

Physical partitions must be updated as a whole. If only single rows within a physical partition are updated, the old data is treated as “removed”.

Parameters
  • data (dask.bag.Bag) – Bag containing dataframes

  • cube – Cube specification.

  • store – Store to which the data should be written to.

  • ktk_cube_dataset_ids – Datasets that will be written, must be specified in advance.

  • metadata – Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of metadata keys is not possible.

  • remove_conditions – Conditions that select which partitions to remove.

  • df_serializer – Optional Dataframe to Parquet serializer

Returns

metadata_dict – A dask bag object containing the compute graph to append to the cube returning the dict of dataset metadata objects. The bag has a single partition with a single element.

Return type

dask.bag.Bag

kartothek.io.dask.common_cube.build_cube_from_bag_internal(data: dask.bag.core.Bag, cube: kartothek.core.cube.cube.Cube, store: Callable[], simplekv.KeyValueStore], ktk_cube_dataset_ids: Optional[Iterable[str]], metadata: Optional[Dict[str, Dict[str, Any]]], overwrite: bool, partition_on: Optional[Dict[str, Iterable[str]]], df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None)dask.bag.core.Bag[source]

Create dask computation graph that builds a cube with the data supplied from a dask bag.

Parameters
  • data (dask.bag.Bag) – Bag containing dataframes

  • cube – Cube specification.

  • store – Store to which the data should be written to.

  • ktk_cube_dataset_ids – Datasets that will be written, must be specified in advance. If left unprovided, it is assumed that only the seed dataset will be written.

  • metadata – Metadata for every dataset.

  • overwrite – If possibly existing datasets should be overwritten.

  • partition_on – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns).

  • df_serializer – Optional Dataframe to Parquet serializer

Returns

metadata_dict – A dask bag object containing the compute graph to build a cube returning the dict of dataset metadata objects. The bag has a single partition with a single element.

Return type

dask.bag.Bag

kartothek.io.dask.common_cube.extend_cube_from_bag_internal(data: dask.bag.core.Bag, cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, ktk_cube_dataset_ids: Optional[Iterable[str]], metadata: Optional[Dict[str, Dict[str, Any]]], overwrite: bool, partition_on: Optional[Dict[str, Iterable[str]]], df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None)dask.bag.core.Bag[source]

Create dask computation graph that extends a cube by the data supplied from a dask bag.

For details on data and metadata, see build_cube().

Parameters
  • data (dask.bag.Bag) – Bag containing dataframes (see build_cube() for possible format and types).

  • cube (kartothek.core.cube.cube.Cube) – Cube specification.

  • store – Store to which the data should be written to.

  • ktk_cube_dataset_ids – Datasets that will be written, must be specified in advance.

  • metadata – Metadata for every dataset.

  • overwrite – If possibly existing datasets should be overwritten.

  • partition_on – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns).

  • df_serializer – Optional Dataframe to Parquet serializer

Returns

metadata_dict – A dask bag object containing the compute graph to extend a cube returning the dict of dataset metadata objects. The bag has a single partition with a single element.

Return type

dask.bag.Bag

kartothek.io.dask.common_cube.query_cube_bag_internal(cube, store, conditions, datasets, dimension_columns, partition_by, payload_columns, blocksize)[source]

Query cube.

For detailed documentation, see query_cube().

Parameters
  • cube (Cube) – Cube specification.

  • store (simplekv.KeyValueStore) – KV store that preserves the cube.

  • conditions (Union[None, Condition, Iterable[Condition], Conjunction]) – Conditions that should be applied, optional.

  • datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to query, must all be part of the cube. May be either the result of discover_datasets(), a list of Ktk_cube dataset ID or None (in which case auto-discovery will be used).

  • dimension_columns (Union[None, str, Iterable[str]]) – Dimension columns of the query, may result in projection. If not provided, dimension columns from cube specification will be used.

  • partition_by (Union[None, str, Iterable[str]]) – By which column logical partitions should be formed. If not provided, a single partition will be generated.

  • payload_columns (Union[None, str, Iterable[str]]) – Which columns apart from dimension_columns and partition_by should be returned.

  • blocksize (int) – Partition size of the bag.

Returns

  • empty (pandas.DataFrame) – Empty DataFrame with correct dtypes and column order.

  • bag (dask.bag.Bag) – Bag of 1-sized partitions of non-empty DataFrames, order by partition_by. Column of DataFrames is alphabetically ordered. Data types are provided on best effort (they are restored based on the preserved data, but may be different due to Pandas NULL-handling, e.g. integer columns may be floats).