Source code for kartothek.io.dask.dataframe_cube

"""
Dask.DataFrame IO.
"""
from typing import Any, Dict, Iterable, Optional, Union

import dask
import dask.bag as db
import dask.dataframe as dd
from dask.delayed import Delayed
from simplekv import KeyValueStore

from kartothek.api.discover import discover_datasets_unchecked
from kartothek.core.cube.cube import Cube
from kartothek.core.docs import default_docs
from kartothek.core.typing import StoreFactory
from kartothek.io.dask.common_cube import (
    append_to_cube_from_bag_internal,
    extend_cube_from_bag_internal,
    query_cube_bag_internal,
)
from kartothek.io.dask.dataframe import store_dataset_from_ddf
from kartothek.io_components.cube.common import check_store_factory
from kartothek.io_components.cube.write import (
    apply_postwrite_checks,
    assert_dimesion_index_cols_notnull,
    check_datasets_prebuild,
    check_provided_metadata_dict,
    check_user_df,
    prepare_ktk_metadata,
    prepare_ktk_partition_on,
)
from kartothek.serialization._parquet import ParquetSerializer

__all__ = (
    "append_to_cube_from_dataframe",
    "build_cube_from_dataframe",
    "extend_cube_from_dataframe",
    "query_cube_dataframe",
)


[docs]@default_docs def build_cube_from_dataframe( data: Union[dd.DataFrame, Dict[str, dd.DataFrame]], cube: Cube, store: StoreFactory, metadata: Optional[Dict[str, Dict[str, Any]]] = None, overwrite: bool = False, partition_on: Optional[Dict[str, Iterable[str]]] = None, shuffle: bool = False, num_buckets: int = 1, bucket_by: Optional[Iterable[str]] = None, df_serializer: Optional[ParquetSerializer] = None, ) -> Delayed: """ Create dask computation graph that builds a cube with the data supplied from a dask dataframe. Parameters ---------- data Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset. cube Cube specification. store Store to which the data should be written to. metadata Metadata for every dataset. overwrite If possibly existing datasets should be overwritten. partition_on Optional parition-on attributes for datasets (dictionary mapping :term:`Dataset ID` -> columns). df_serializer: Optional Dataframe to Parquet serializer Returns ------- metadata_dict: dask.delayed.Delayed A dask delayed object containing the compute graph to build a cube returning the dict of dataset metadata objects. """ check_store_factory(store) if not isinstance(data, dict): data = {cube.seed_dataset: data} ktk_cube_dataset_ids = sorted(data.keys()) metadata = check_provided_metadata_dict(metadata, ktk_cube_dataset_ids) existing_datasets = discover_datasets_unchecked(cube.uuid_prefix, store) check_datasets_prebuild(ktk_cube_dataset_ids, cube, existing_datasets) partition_on_checked = prepare_ktk_partition_on( cube, ktk_cube_dataset_ids, partition_on ) del partition_on dct = {} for table_name, ddf in data.items(): check_user_df(table_name, ddf, cube, set(), partition_on_checked[table_name]) indices_to_build = set(cube.index_columns) & set(ddf.columns) if table_name == cube.seed_dataset: indices_to_build |= set(cube.dimension_columns) - cube.suppress_index_on indices_to_build -= set(partition_on_checked[table_name]) ddf = ddf.map_partitions( assert_dimesion_index_cols_notnull, ktk_cube_dataset_id=table_name, cube=cube, partition_on=partition_on_checked[table_name], meta=ddf._meta, ) graph = store_dataset_from_ddf( ddf, dataset_uuid=cube.ktk_dataset_uuid(table_name), store=store, metadata=prepare_ktk_metadata(cube, table_name, metadata), partition_on=partition_on_checked[table_name], secondary_indices=sorted(indices_to_build), sort_partitions_by=sorted( (set(cube.dimension_columns) - set(cube.partition_columns)) & set(ddf.columns) ), overwrite=overwrite, shuffle=shuffle, num_buckets=num_buckets, bucket_by=bucket_by, df_serializer=df_serializer, ) dct[table_name] = graph return dask.delayed(apply_postwrite_checks)( dct, cube=cube, store=store, existing_datasets=existing_datasets )
[docs]def extend_cube_from_dataframe( data: Union[dd.DataFrame, Dict[str, dd.DataFrame]], cube: Cube, store: KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, overwrite: bool = False, partition_on: Optional[Dict[str, Iterable[str]]] = None, df_serializer: Optional[ParquetSerializer] = None, ): """ Create dask computation graph that extends a cube by the data supplied from a dask dataframe. For details on ``data`` and ``metadata``, see :func:`~kartothek.io.eager_cube.build_cube`. Parameters ---------- data Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset. cube Cube specification. store Store to which the data should be written to. metadata Metadata for every dataset. overwrite If possibly existing datasets should be overwritten. partition_on Optional parition-on attributes for datasets (dictionary mapping :term:`Dataset ID` -> columns). df_serializer Optional Dataframe to Parquet serializer Returns ------- metadata_dict: dask.bag.Bag A dask bag object containing the compute graph to extend a cube returning the dict of dataset metadata objects. The bag has a single partition with a single element. """ data, ktk_cube_dataset_ids = _ddfs_to_bag(data, cube) return ( extend_cube_from_bag_internal( data=data, cube=cube, store=store, ktk_cube_dataset_ids=ktk_cube_dataset_ids, metadata=metadata, overwrite=overwrite, partition_on=partition_on, df_serializer=df_serializer, ) .map_partitions(_unpack_list, default=None) .to_delayed()[0] )
[docs]def query_cube_dataframe( cube, store, conditions=None, datasets=None, dimension_columns=None, partition_by=None, payload_columns=None, ): """ Query cube. For detailed documentation, see :func:`~kartothek.io.eager_cube.query_cube`. .. important:: In contrast to other backends, the Dask DataFrame may contain partitions with empty DataFrames! Parameters ---------- cube: Cube Cube specification. store: simplekv.KeyValueStore KV store that preserves the cube. conditions: Union[None, Condition, Iterable[Condition], Conjunction] Conditions that should be applied, optional. datasets: Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]] Datasets to query, must all be part of the cube. May be either the result of :func:`~kartothek.api.discover.discover_datasets`, a list of Ktk_cube dataset ID or ``None`` (in which case auto-discovery will be used). dimension_columns: Union[None, str, Iterable[str]] Dimension columns of the query, may result in projection. If not provided, dimension columns from cube specification will be used. partition_by: Union[None, str, Iterable[str]] By which column logical partitions should be formed. If not provided, a single partition will be generated. payload_columns: Union[None, str, Iterable[str]] Which columns apart from ``dimension_columns`` and ``partition_by`` should be returned. Returns ------- ddf: dask.dataframe.DataFrame Dask DataFrame, partitioned and order by ``partition_by``. Column of DataFrames is alphabetically ordered. Data types are provided on best effort (they are restored based on the preserved data, but may be different due to Pandas NULL-handling, e.g. integer columns may be floats). """ empty, b = query_cube_bag_internal( cube=cube, store=store, conditions=conditions, datasets=datasets, dimension_columns=dimension_columns, partition_by=partition_by, payload_columns=payload_columns, blocksize=1, ) dfs = b.map_partitions(_unpack_list, default=empty).to_delayed() return dd.from_delayed( dfs=dfs, meta=empty, divisions=None # TODO: figure out an API to support this )
[docs]def append_to_cube_from_dataframe( data: db.Bag, cube: Cube, store: KeyValueStore, metadata: Optional[Dict[str, Dict[str, Any]]] = None, df_serializer: Optional[ParquetSerializer] = None, ) -> db.Bag: """ Append data to existing cube. For details on ``data`` and ``metadata``, see :func:`~kartothek.io.eager_cube.build_cube`. .. important:: Physical partitions must be updated as a whole. If only single rows within a physical partition are updated, the old data is treated as "removed". .. hint:: To have better control over the overwrite "mask" (i.e. which partitions are overwritten), you should use :func:`~kartothek.io.eager_cube.remove_partitions` beforehand. Parameters ---------- data: dask.bag.Bag Bag containing dataframes cube: Cube specification. store: Store to which the data should be written to. metadata: Metadata for every dataset, optional. For every dataset, only given keys are updated/replaced. Deletion of metadata keys is not possible. df_serializer: Optional Dataframe to Parquet serializer Returns ------- metadata_dict: dask.bag.Bag A dask bag object containing the compute graph to append to the cube returning the dict of dataset metadata objects. The bag has a single partition with a single element. """ data, ktk_cube_dataset_ids = _ddfs_to_bag(data, cube) return ( append_to_cube_from_bag_internal( data=data, cube=cube, store=store, ktk_cube_dataset_ids=ktk_cube_dataset_ids, metadata=metadata, df_serializer=df_serializer, ) .map_partitions(_unpack_list, default=None) .to_delayed()[0] )
def _ddfs_to_bag(data, cube): if not isinstance(data, dict): data = {cube.seed_dataset: data} ktk_cube_dataset_ids = sorted(data.keys()) bags = [] for ktk_cube_dataset_id in ktk_cube_dataset_ids: bags.append( db.from_delayed(data[ktk_cube_dataset_id].to_delayed()).map_partitions( _convert_write_bag, ktk_cube_dataset_id=ktk_cube_dataset_id ) ) return (db.concat(bags), ktk_cube_dataset_ids) def _unpack_list(l, default): # noqa l = list(l) # noqa if l: return l[0] else: return default def _convert_write_bag(df, ktk_cube_dataset_id): return [{ktk_cube_dataset_id: df}]