kartothek.io.dask.dataframe module

kartothek.io.dask.dataframe.collect_dataset_metadata(store: Optional[Callable[[], simplekv.KeyValueStore]] = None, dataset_uuid: Optional[str] = None, table_name: str = 'table', predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]] = None, frac: float = 1.0, factory: Optional[kartothek.core.factory.DatasetFactory] = None) → dask.dataframe.core.DataFrame[source]

Collect parquet metadata of the dataset. The frac parameter can be used to select a subset of the data.

Warning

If the size of the partitions is not evenly distributed, e.g. some partitions might be larger than others, the metadata returned is not a good approximation for the whole dataset metadata.

Warning

Using the frac parameter is not encouraged for a small number of total partitions.

Parameters:
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • dataset_uuid (str) – The dataset UUID
  • predicates (list of list of tuple[str, str, Any]) –

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

    Available operators are: ==, !=, <=, >=, <, > and in.

    Filtering for missings is supported with operators ==, != and in and values np.nan and None for float and string columns respectively.

    Categorical data

    When using order sensitive operators on categorical data we will assume that the categories obey a lexicographical ordering. This filtering may result in less than optimal performance and may be slower than the evaluation on non-categorical data.

  • factory (kartothek.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.
  • predicates

    Kartothek predicates to apply filters on the data for which to gather statistics

    Warning

    Filtering will only be applied for predicates on indices. The evaluation of the predicates therefore will therefore only return an approximate result.

  • frac – Fraction of the total number of partitions to use for gathering statistics. frac == 1.0 will use all partitions.
Returns:

  • partition_label: File name of the parquet file, unique to each physical partition.
  • row_group_id: Index of the row groups within one parquet file.
  • row_group_compressed_size: Byte size of the data within one row group.
  • row_group_uncompressed_size: Byte size (uncompressed) of the data within one row group.
  • number_rows_total: Total number of rows in one parquet file.
  • number_row_groups: Number of row groups in one parquet file.
  • serialized_size: Serialized size of the parquet file.
  • number_rows_per_row_group: Number of rows per row group.

Return type:

A dask.DataFrame containing the following information about dataset statistics

Raises:

ValueError – If no metadata could be retrieved, raise an error.

kartothek.io.dask.dataframe.hash_dataset(store: Union[str, simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore], None] = None, dataset_uuid: Optional[str] = None, subset=None, group_key=None, table: str = 'table', predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]] = None, factory: Optional[kartothek.core.factory.DatasetFactory] = None) → dask.dataframe.core.Series[source]

Calculate a partition wise, or group wise, hash of the dataset.

Note

We do not guarantee the hash values to remain constant accross versions.

Example output:

Assuming a dataset with two unique values in column `P` this gives

>>> hash_dataset(factory=dataset_with_index_factory,group_key=["P"]).compute()
... P
... 1    11462879952839863487
... 2    12568779102514529673
... dtype: uint64
Parameters:
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • dataset_uuid (str) – The dataset UUID
  • table (str, optional) – The table to be loaded. If none is specified, the default ‘table’ is used.
  • predicates (list of list of tuple[str, str, Any]) –

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

    Available operators are: ==, !=, <=, >=, <, > and in.

    Filtering for missings is supported with operators ==, != and in and values np.nan and None for float and string columns respectively.

    Categorical data

    When using order sensitive operators on categorical data we will assume that the categories obey a lexicographical ordering. This filtering may result in less than optimal performance and may be slower than the evaluation on non-categorical data.

  • factory (kartothek.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.
  • subset – If provided, only take these columns into account when hashing the dataset
  • group_key – If provided, calculate hash per group instead of per partition
kartothek.io.dask.dataframe.read_dataset_as_ddf(dataset_uuid=None, store=None, table='table', columns=None, concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, label_filter=None, dates_as_object=False, predicates=None, factory=None, dask_index_on=None, dispatch_by=None)[source]

Retrieve a single table from a dataset as partition-individual DataFrame instance.

Please take care when using categoricals with Dask. For index columns, this function will construct dataset wide categoricals. For all other columns, Dask will determine the categories on a partition level and will need to merge them when shuffling data.

Parameters:
  • dataset_uuid (str) – The dataset UUID
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • table (str, optional) – The table to be loaded. If none is specified, the default ‘table’ is used.
  • columns (dict of list of string, optional) – A dictionary mapping tables to list of columns. Only the specified columns are loaded for the corresponding table. If a specfied table or column is not present in the dataset, a ValueError is raised.
  • concat_partitions_on_primary_index (bool) – Concatenate partition based on their primary index values.
  • predicate_pushdown_to_io (bool) – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the storage layer that need to be addressed there.
  • categoricals (dicts of list of string) – A dictionary mapping tables to list of columns that should be loaded as category dtype instead of the inferred one.
  • label_filter (callable) – A callable taking a partition label as a parameter and returns a boolean. The callable will be applied to the list of partitions during dispatch and will filter out all partitions for which the callable evaluates to False.
  • dates_as_object (bool) – Load pyarrow.date{32,64} columns as object columns in Pandas instead of using np.datetime64 to preserve their type. While this improves type-safety, this comes at a performance cost.
  • predicates (list of list of tuple[str, str, Any]) –

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

    Available operators are: ==, !=, <=, >=, <, > and in.

    Filtering for missings is supported with operators ==, != and in and values np.nan and None for float and string columns respectively.

    Categorical data

    When using order sensitive operators on categorical data we will assume that the categories obey a lexicographical ordering. This filtering may result in less than optimal performance and may be slower than the evaluation on non-categorical data.

  • factory (kartothek.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.
  • dispatch_by (list of strings, optional) –

    List of index columns to group and partition the jobs by. There will be one job created for every observed index value combination. This may result in either many very small partitions or in few very large partitions, depending on the index you are using this on.

    Secondary indices

    This is also useable in combination with secondary indices where the physical file layout may not be aligned with the logically requested layout. For optimal performance it is recommended to use this for columns which can benefit from predicate pushdown since the jobs will fetch their data individually and will not shuffle data in memory / over network.

  • dask_index_on (str) –

    Reconstruct (and set) a dask index on the provided index column. Cannot be used in conjunction with dispatch_by.

    For details on performance, see also dispatch_by

kartothek.io.dask.dataframe.store_dataset_from_ddf(ddf: dask.dataframe.core.DataFrame, store: Union[str, simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]], dataset_uuid: str, table: str = 'table', secondary_indices: Optional[List[str]] = None, shuffle: bool = False, repartition_ratio: Optional[SupportsFloat] = None, num_buckets: int = 1, sort_partitions_by: Union[List[str], str, None] = None, delete_scope: Optional[Iterable[Mapping[str, str]]] = None, metadata: Optional[Mapping[KT, VT_co]] = None, df_serializer: Optional[kartothek.serialization._generic.DataFrameSerializer] = None, metadata_merger: Optional[Callable] = None, metadata_version: int = 4, partition_on: Optional[List[str]] = None, bucket_by: Union[List[str], str, None] = None, overwrite: bool = False)[source]

Store a dataset from a dask.dataframe.

Behavior without shuffle==False

In the case without partition_on every dask partition is mapped to a single kartothek partition

In the case with partition_on every dask partition is mapped to N kartothek partitions, where N depends on the content of the respective partition, such that every resulting kartothek partition has only a single value in the respective partition_on columns.

Behavior with shuffle==True

partition_on is mandatory

Perform a data shuffle to ensure that every primary key will have at most num_bucket.

Note

The number of allowed buckets will have an impact on the required resources and runtime. Using a larger number of allowed buckets will usually reduce resource consumption and in some cases also improves runtime performance.

Example:
>>> partition_on="primary_key"
>>> num_buckets=2  # doctest: +SKIP
primary_key=1/bucket1.parquet
primary_key=1/bucket2.parquet

Note

This can only be used for datasets with a single table!

See also, Force partitioning by shuffling using Dask.

Parameters:
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • dataset_uuid (str) – The dataset UUID
  • table (str, optional) – The table to be loaded. If none is specified, the default ‘table’ is used.
  • secondary_indices (List[str]) – A list of columns for which a secondary index should be calculated.
  • sort_partitions_by (str) – Provide a column after which the data should be sorted before storage to enable predicate pushdown.
  • delete_scope (list of dicts) – This defines which partitions are replaced with the input and therefore get deleted. It is a lists of query filters for the dataframe in the form of a dictionary, e.g.: [{‘column_1’: ‘value_1’}, {‘column_1’: ‘value_2’}]. Each query filter will be given to: func: `dataset.query and the returned partitions will be deleted. If no scope is given nothing will be deleted. For kartothek.io.dask.update.update_dataset.* a delayed object resolving to a list of dicts is also accepted.
  • metadata (dict, optional) – A dictionary used to update the dataset metadata.
  • df_serializer (DataFrameSerializer, optional) – A pandas DataFrame serialiser from kartothek.serialization
  • metadata_merger (callable, optional) – By default partition metadata is combined using the combine_metadata() function. You can supply a callable here that implements a custom merge operation on the metadata dictionaries (depending on the matches this might be more than two values).
  • metadata_version (int, optional) – The dataset metadata version
  • partition_on (list) – Column names by which the dataset should be partitioned by physically. These columns may later on be used as an Index to improve query performance. Partition columns need to be present in all dataset tables. Sensitive to ordering.
  • overwrite (bool, optional) – If True, allow overwrite of an existing dataset.
  • ddf (Union[dask.dataframe.DataFrame, None]) – The dask.Dataframe to be used to calculate the new partitions from. If this parameter is None, the update pipeline will only delete partitions without creating new ones.
  • shuffle (bool) –

    If True and partition_on is requested, shuffle the data to reduce number of output partitions.

    See also, Shuffling.

    Warning

    Dask uses a heuristic to determine how data is shuffled and there are two options, partd for local disk shuffling and tasks for distributed shuffling using a task graph. If there is no distributed.Client in the context and the option is not set explicitly, dask will choose partd which may cause data loss when the graph is executed on a distributed cluster.

    Therefore, we recommend to specify the dask shuffle method explicitly, e.g. by using a context manager.

    with dask.config.set(shuffle='tasks'):
        graph = update_dataset_from_ddf(...)
    graph.compute()
    
  • repartition_ratio (Optional[Union[int, float]]) – If provided, repartition the dataframe before calculation starts to ceil(ddf.npartitions / repartition_ratio)
  • num_buckets (int) – If provided, the output partitioning will have num_buckets files per primary key partitioning. This effectively splits up the execution num_buckets times. Setting this parameter may be helpful when scaling. This only has an effect if shuffle==True
  • bucket_by

    The subset of columns which should be considered for bucketing.

    This parameter ensures that groups of the given subset are never split across buckets within a given partition.

    Without specifying this the buckets will be created randomly.

    This only has an effect if shuffle==True

    Secondary indices

    This parameter has a strong effect on the performance of secondary indices. Since it guarantees that a given tuple of the subset will be entirely put into the same file you can build efficient indices with this approach.

    Note

    Only columns with data types which can be hashed are allowed to be used in this.

kartothek.io.dask.dataframe.update_dataset_from_ddf(ddf: dask.dataframe.core.DataFrame, store: Union[str, simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore], None] = None, dataset_uuid: Optional[str] = None, table: str = 'table', secondary_indices: Optional[List[str]] = None, shuffle: bool = False, repartition_ratio: Optional[SupportsFloat] = None, num_buckets: int = 1, sort_partitions_by: Union[List[str], str, None] = None, delete_scope: Optional[Iterable[Mapping[str, str]]] = None, metadata: Optional[Mapping[KT, VT_co]] = None, df_serializer: Optional[kartothek.serialization._generic.DataFrameSerializer] = None, metadata_merger: Optional[Callable] = None, default_metadata_version: int = 4, partition_on: Optional[List[str]] = None, factory: Optional[kartothek.core.factory.DatasetFactory] = None, bucket_by: Union[List[str], str, None] = None)[source]

Update a dataset from a dask.dataframe.

Behavior without shuffle==False

In the case without partition_on every dask partition is mapped to a single kartothek partition

In the case with partition_on every dask partition is mapped to N kartothek partitions, where N depends on the content of the respective partition, such that every resulting kartothek partition has only a single value in the respective partition_on columns.

Behavior with shuffle==True

partition_on is mandatory

Perform a data shuffle to ensure that every primary key will have at most num_bucket.

Note

The number of allowed buckets will have an impact on the required resources and runtime. Using a larger number of allowed buckets will usually reduce resource consumption and in some cases also improves runtime performance.

Example:
>>> partition_on="primary_key"
>>> num_buckets=2  # doctest: +SKIP
primary_key=1/bucket1.parquet
primary_key=1/bucket2.parquet

Note

This can only be used for datasets with a single table!

See also, Force partitioning by shuffling using Dask.

Parameters:
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • dataset_uuid (str) – The dataset UUID
  • table (str, optional) – The table to be loaded. If none is specified, the default ‘table’ is used.
  • secondary_indices (List[str]) – A list of columns for which a secondary index should be calculated.
  • sort_partitions_by (str) – Provide a column after which the data should be sorted before storage to enable predicate pushdown.
  • delete_scope (list of dicts) – This defines which partitions are replaced with the input and therefore get deleted. It is a lists of query filters for the dataframe in the form of a dictionary, e.g.: [{‘column_1’: ‘value_1’}, {‘column_1’: ‘value_2’}]. Each query filter will be given to: func: `dataset.query and the returned partitions will be deleted. If no scope is given nothing will be deleted. For kartothek.io.dask.update.update_dataset.* a delayed object resolving to a list of dicts is also accepted.
  • metadata (dict, optional) – A dictionary used to update the dataset metadata.
  • df_serializer (DataFrameSerializer, optional) – A pandas DataFrame serialiser from kartothek.serialization
  • metadata_merger (callable, optional) – By default partition metadata is combined using the combine_metadata() function. You can supply a callable here that implements a custom merge operation on the metadata dictionaries (depending on the matches this might be more than two values).
  • default_metadata_version (int) – Default metadata version. (Note: Metadata version greater than 3 are only supported)
  • partition_on (list) – Column names by which the dataset should be partitioned by physically. These columns may later on be used as an Index to improve query performance. Partition columns need to be present in all dataset tables. Sensitive to ordering.
  • factory (kartothek.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.
  • ddf (Union[dask.dataframe.DataFrame, None]) – The dask.Dataframe to be used to calculate the new partitions from. If this parameter is None, the update pipeline will only delete partitions without creating new ones.
  • shuffle (bool) –

    If True and partition_on is requested, shuffle the data to reduce number of output partitions.

    See also, Shuffling.

    Warning

    Dask uses a heuristic to determine how data is shuffled and there are two options, partd for local disk shuffling and tasks for distributed shuffling using a task graph. If there is no distributed.Client in the context and the option is not set explicitly, dask will choose partd which may cause data loss when the graph is executed on a distributed cluster.

    Therefore, we recommend to specify the dask shuffle method explicitly, e.g. by using a context manager.

    with dask.config.set(shuffle='tasks'):
        graph = update_dataset_from_ddf(...)
    graph.compute()
    
  • repartition_ratio (Optional[Union[int, float]]) – If provided, repartition the dataframe before calculation starts to ceil(ddf.npartitions / repartition_ratio)
  • num_buckets (int) – If provided, the output partitioning will have num_buckets files per primary key partitioning. This effectively splits up the execution num_buckets times. Setting this parameter may be helpful when scaling. This only has an effect if shuffle==True
  • bucket_by

    The subset of columns which should be considered for bucketing.

    This parameter ensures that groups of the given subset are never split across buckets within a given partition.

    Without specifying this the buckets will be created randomly.

    This only has an effect if shuffle==True

    Secondary indices

    This parameter has a strong effect on the performance of secondary indices. Since it guarantees that a given tuple of the subset will be entirely put into the same file you can build efficient indices with this approach.

    Note

    Only columns with data types which can be hashed are allowed to be used in this.