kartothek.io.dask.dataframe_cube module

Dask.DataFrame IO.

kartothek.io.dask.dataframe_cube.append_to_cube_from_dataframe(data: dask.bag.core.Bag, cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None)dask.bag.core.Bag[source]

Append data to existing cube.

For details on data and metadata, see build_cube().

Important

Physical partitions must be updated as a whole. If only single rows within a physical partition are updated, the old data is treated as “removed”.

Hint

To have better control over the overwrite “mask” (i.e. which partitions are overwritten), you should use remove_partitions() beforehand.

Parameters
  • data (dask.bag.Bag) – Bag containing dataframes

  • cube – Cube specification.

  • store – Store to which the data should be written to.

  • metadata – Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of metadata keys is not possible.

  • df_serializer – Optional Dataframe to Parquet serializer

Returns

metadata_dict – A dask bag object containing the compute graph to append to the cube returning the dict of dataset metadata objects. The bag has a single partition with a single element.

Return type

dask.bag.Bag

kartothek.io.dask.dataframe_cube.build_cube_from_dataframe(data: Union[dask.dataframe.core.DataFrame, Dict[str, dask.dataframe.core.DataFrame]], cube: kartothek.core.cube.cube.Cube, store: Callable[], simplekv.KeyValueStore], metadata: Optional[Dict[str, Dict[str, Any]]] = None, overwrite: bool = False, partition_on: Optional[Dict[str, Iterable[str]]] = None, shuffle: bool = False, num_buckets: int = 1, bucket_by: Optional[Iterable[str]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None)dask.delayed.Delayed[source]

Create dask computation graph that builds a cube with the data supplied from a dask dataframe.

Parameters
  • store (Callable or str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • metadata (Optional[Dict]) – A dictionary used to update the dataset metadata.

  • overwrite (Optional[bool]) – If True, allow overwrite of an existing dataset.

  • partition_on (List) – Column names by which the dataset should be partitioned by physically. These columns may later on be used as an Index to improve query performance. Partition columns need to be present in all dataset tables. Sensitive to ordering.

  • data – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.

  • cube – Cube specification.

  • store – Store to which the data should be written to.

  • metadata – Metadata for every dataset.

  • overwrite – If possibly existing datasets should be overwritten.

  • partition_on – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns).

  • df_serializer – Optional Dataframe to Parquet serializer

Returns

metadata_dict – A dask delayed object containing the compute graph to build a cube returning the dict of dataset metadata objects.

Return type

dask.delayed.Delayed

kartothek.io.dask.dataframe_cube.extend_cube_from_dataframe(data: Union[dask.dataframe.core.DataFrame, Dict[str, dask.dataframe.core.DataFrame]], cube: kartothek.core.cube.cube.Cube, store: simplekv.KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, overwrite: bool = False, partition_on: Optional[Dict[str, Iterable[str]]] = None, df_serializer: Optional[kartothek.serialization._parquet.ParquetSerializer] = None)[source]

Create dask computation graph that extends a cube by the data supplied from a dask dataframe.

For details on data and metadata, see build_cube().

Parameters
  • data – Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset.

  • cube – Cube specification.

  • store – Store to which the data should be written to.

  • metadata – Metadata for every dataset.

  • overwrite – If possibly existing datasets should be overwritten.

  • partition_on – Optional parition-on attributes for datasets (dictionary mapping Dataset ID -> columns).

  • df_serializer – Optional Dataframe to Parquet serializer

Returns

metadata_dict – A dask bag object containing the compute graph to extend a cube returning the dict of dataset metadata objects. The bag has a single partition with a single element.

Return type

dask.bag.Bag

kartothek.io.dask.dataframe_cube.query_cube_dataframe(cube, store, conditions=None, datasets=None, dimension_columns=None, partition_by=None, payload_columns=None)[source]

Query cube.

For detailed documentation, see query_cube().

Important

In contrast to other backends, the Dask DataFrame may contain partitions with empty DataFrames!

Parameters
  • cube (Cube) – Cube specification.

  • store (simplekv.KeyValueStore) – KV store that preserves the cube.

  • conditions (Union[None, Condition, Iterable[Condition], Conjunction]) – Conditions that should be applied, optional.

  • datasets (Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]]) – Datasets to query, must all be part of the cube. May be either the result of discover_datasets(), a list of Ktk_cube dataset ID or None (in which case auto-discovery will be used).

  • dimension_columns (Union[None, str, Iterable[str]]) – Dimension columns of the query, may result in projection. If not provided, dimension columns from cube specification will be used.

  • partition_by (Union[None, str, Iterable[str]]) – By which column logical partitions should be formed. If not provided, a single partition will be generated.

  • payload_columns (Union[None, str, Iterable[str]]) – Which columns apart from dimension_columns and partition_by should be returned.

Returns

ddf – Dask DataFrame, partitioned and order by partition_by. Column of DataFrames is alphabetically ordered. Data types are provided on best effort (they are restored based on the preserved data, but may be different due to Pandas NULL-handling, e.g. integer columns may be floats).

Return type

dask.dataframe.DataFrame