kartothek.io.dask.delayed module

kartothek.io.dask.delayed.delete_dataset__delayed(dataset_uuid=None, store=None, factory=None)[source]
Parameters:
  • dataset_uuid (str) – The dataset UUID
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • factory (kartothek.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.
kartothek.io.dask.delayed.garbage_collect_dataset__delayed(dataset_uuid=None, store=None, chunk_size=100, factory=None)[source]

Remove auxiliary files that are no longer tracked by the dataset.

These files include indices that are no longer referenced by the metadata as well as files in the directories of the tables that are no longer referenced. The latter is only applied to static datasets.

Parameters:
  • dataset_uuid (str) – The dataset UUID
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • factory (kartothek.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.
  • chunk_size (int) – Number of files that should be deleted in a single job.
Returns:

tasks

Return type:

list of dask.delayed

kartothek.io.dask.delayed.merge_datasets_as_delayed(left_dataset_uuid, right_dataset_uuid, store, merge_tasks, match_how='exact', label_merger=None, metadata_merger=None)[source]

A dask.delayed graph to perform the merge of two full kartothek datasets.

Parameters:
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • label_merger (callable, optional) – By default the shorter label of either the left or right partition is chosen as the merged partition label. Supplying a callable here, allows you to override the default behavior and create a new label from all input labels (depending on the matches this might be more than two values)
  • metadata_merger (callable, optional) – By default partition metadata is combined using the combine_metadata() function. You can supply a callable here that implements a custom merge operation on the metadata dictionaries (depending on the matches this might be more than two values).
  • left_dataset_uuid (str) – UUID for left dataset (order does not matter in all merge schemas)
  • right_dataset_uuid (str) – UUID for right dataset (order does not matter in all merge schemas)
  • match_how (Union[str, Callable]) –

    Define the partition label matching scheme. Available implementations are:

    • left (right) : The left (right) partitions are considered to be
      the base partitions and all partitions of the right (left) dataset are joined to the left partition. This should only be used if one of the datasets contain very few partitions.
    • prefix : The labels of the partitions of the dataset with fewer
      partitions are considered to be the prefixes to the right dataset
    • exact : All partition labels of the left dataset need to have
      an exact match in the right dataset
    • callable : A callable with signature func(left, right) which
      returns a boolean to determine if the partitions match

    If True, an exact match of partition labels between the to-be-merged datasets is required in order to merge. If False (Default), the partition labels of the dataset with fewer partitions are interpreted as prefixes.

  • merge_tasks (List[Dict]) –

    A list of merge tasks. Each item in this list is a dictionary giving explicit instructions for a specific merge. Each dict should contain key/values:

    • left: The table for the left dataframe
    • right: The table for the right dataframe
    • ’output_label’ : The table for the merged dataframe
    • merge_func: A callable with signature
      merge_func(left_df, right_df, merge_kwargs) to handle the data preprocessing and merging. Default pandas.merge
    • ’merge_kwargs’ : The kwargs to be passed to the merge_func

    Example:

    >>> merge_tasks = [
    ...     {
    ...         "left": "left_dict",
    ...         "right": "right_dict",
    ...         "merge_kwargs": {"kwargs of merge_func": ''},
    ...         "output_label": 'merged_core_data'
    ...     },
    ... ]
    
kartothek.io.dask.delayed.read_dataset_as_delayed(dataset_uuid=None, store=None, tables=None, columns=None, concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, label_filter=None, dates_as_object=False, predicates=None, factory=None, dispatch_by=None)[source]

A collection of dask.delayed objects to retrieve a dataset from store where each partition is loaded as a DataFrame.

Parameters:
  • dataset_uuid (str) – The dataset UUID
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • tables (list of str) – A list of tables to be loaded. If None is given, all tables of a partition are loaded
  • columns (dict of list of string, optional) – A dictionary mapping tables to list of columns. Only the specified columns are loaded for the corresponding table. If a specfied table or column is not present in the dataset, a ValueError is raised.
  • concat_partitions_on_primary_index (bool) – Concatenate partition based on their primary index values.
  • predicate_pushdown_to_io (bool) – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the storage layer that need to be addressed there.
  • categoricals (dicts of list of string) – A dictionary mapping tables to list of columns that should be loaded as category dtype instead of the inferred one.
  • label_filter (callable) – A callable taking a partition label as a parameter and returns a boolean. The callable will be applied to the list of partitions during dispatch and will filter out all partitions for which the callable evaluates to False.
  • dates_as_object (bool) – Load pyarrow.date{32,64} columns as object columns in Pandas instead of using np.datetime64 to preserve their type. While this improves type-safety, this comes at a performance cost.
  • predicates (list of list of tuple[str, str, Any]) –

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

    Available operators are: ==, !=, <=, >=, <, > and in.

    Filtering for missings is supported with operators ==, != and in and values np.nan and None for float and string columns respectively.

    Categorical data

    When using order sensitive operators on categorical data we will assume that the categories obey a lexicographical ordering. This filtering may result in less than optimal performance and may be slower than the evaluation on non-categorical data.

  • factory (kartothek.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.
  • dispatch_by (list of strings, optional) –

    List of index columns to group and partition the jobs by. There will be one job created for every observed index value combination. This may result in either many very small partitions or in few very large partitions, depending on the index you are using this on.

    Secondary indices

    This is also useable in combination with secondary indices where the physical file layout may not be aligned with the logically requested layout. For optimal performance it is recommended to use this for columns which can benefit from predicate pushdown since the jobs will fetch their data individually and will not shuffle data in memory / over network.

kartothek.io.dask.delayed.read_dataset_as_delayed_metapartitions(dataset_uuid=None, store=None, tables=None, columns=None, concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, label_filter=None, dates_as_object=False, load_dataset_metadata=False, predicates=None, factory=None, dispatch_by=None, dispatch_metadata=True)[source]

A collection of dask.delayed objects to retrieve a dataset from store where each partition is loaded as a MetaPartition.

Parameters:
  • dataset_uuid (str) – The dataset UUID
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • tables (list of str) – A list of tables to be loaded. If None is given, all tables of a partition are loaded
  • columns (dict of list of string, optional) – A dictionary mapping tables to list of columns. Only the specified columns are loaded for the corresponding table. If a specfied table or column is not present in the dataset, a ValueError is raised.
  • concat_partitions_on_primary_index (bool) – Concatenate partition based on their primary index values.
  • predicate_pushdown_to_io (bool) – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the storage layer that need to be addressed there.
  • categoricals (dicts of list of string) – A dictionary mapping tables to list of columns that should be loaded as category dtype instead of the inferred one.
  • label_filter (callable) – A callable taking a partition label as a parameter and returns a boolean. The callable will be applied to the list of partitions during dispatch and will filter out all partitions for which the callable evaluates to False.
  • dates_as_object (bool) – Load pyarrow.date{32,64} columns as object columns in Pandas instead of using np.datetime64 to preserve their type. While this improves type-safety, this comes at a performance cost.
  • load_dataset_metadata (bool) – Optional argument on whether to load the metadata or not
  • predicates (list of list of tuple[str, str, Any]) –

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

    Available operators are: ==, !=, <=, >=, <, > and in.

    Filtering for missings is supported with operators ==, != and in and values np.nan and None for float and string columns respectively.

    Categorical data

    When using order sensitive operators on categorical data we will assume that the categories obey a lexicographical ordering. This filtering may result in less than optimal performance and may be slower than the evaluation on non-categorical data.

  • factory (kartothek.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.
  • dispatch_by (list of strings, optional) –

    List of index columns to group and partition the jobs by. There will be one job created for every observed index value combination. This may result in either many very small partitions or in few very large partitions, depending on the index you are using this on.

    Secondary indices

    This is also useable in combination with secondary indices where the physical file layout may not be aligned with the logically requested layout. For optimal performance it is recommended to use this for columns which can benefit from predicate pushdown since the jobs will fetch their data individually and will not shuffle data in memory / over network.

  • dispatch_metadata – If True, attach dataset user metadata and dataset index information to the MetaPartition instances generated. Note: This feature is deprecated and this feature toggle is only introduced to allow for easier transition.
kartothek.io.dask.delayed.read_table_as_delayed(dataset_uuid=None, store=None, table='table', columns=None, concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, label_filter=None, dates_as_object=False, predicates=None, factory=None, dispatch_by=None)[source]

A collection of dask.delayed objects to retrieve a single table from a dataset as partition-individual DataFrame instances.

You can transform the collection of dask.delayed objects into a dask.dataframe using the following code snippet. As older kartothek specifications don’t store schema information, this must be provided by a separate code path.

>>> import dask.dataframe as dd
>>> ddf_tasks = read_table_as_delayed(…)
>>> meta = …
>>> ddf = dd.from_delayed(ddf_tasks, meta=meta)
Parameters:
  • dataset_uuid (str) – The dataset UUID
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • table (str, optional) – The table to be loaded. If none is specified, the default ‘table’ is used.
  • columns (dict of list of string, optional) – A dictionary mapping tables to list of columns. Only the specified columns are loaded for the corresponding table. If a specfied table or column is not present in the dataset, a ValueError is raised.
  • concat_partitions_on_primary_index (bool) – Concatenate partition based on their primary index values.
  • predicate_pushdown_to_io (bool) – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the storage layer that need to be addressed there.
  • categoricals (dicts of list of string) – A dictionary mapping tables to list of columns that should be loaded as category dtype instead of the inferred one.
  • label_filter (callable) – A callable taking a partition label as a parameter and returns a boolean. The callable will be applied to the list of partitions during dispatch and will filter out all partitions for which the callable evaluates to False.
  • dates_as_object (bool) – Load pyarrow.date{32,64} columns as object columns in Pandas instead of using np.datetime64 to preserve their type. While this improves type-safety, this comes at a performance cost.
  • predicates (list of list of tuple[str, str, Any]) –

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

    Available operators are: ==, !=, <=, >=, <, > and in.

    Filtering for missings is supported with operators ==, != and in and values np.nan and None for float and string columns respectively.

    Categorical data

    When using order sensitive operators on categorical data we will assume that the categories obey a lexicographical ordering. This filtering may result in less than optimal performance and may be slower than the evaluation on non-categorical data.

  • factory (kartothek.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.
  • dispatch_by (list of strings, optional) –

    List of index columns to group and partition the jobs by. There will be one job created for every observed index value combination. This may result in either many very small partitions or in few very large partitions, depending on the index you are using this on.

    Secondary indices

    This is also useable in combination with secondary indices where the physical file layout may not be aligned with the logically requested layout. For optimal performance it is recommended to use this for columns which can benefit from predicate pushdown since the jobs will fetch their data individually and will not shuffle data in memory / over network.

kartothek.io.dask.delayed.store_delayed_as_dataset(delayed_tasks, store, dataset_uuid=None, metadata=None, df_serializer=None, overwrite=False, metadata_merger=None, metadata_version=4, partition_on=None, metadata_storage_format='json', secondary_indices=None)[source]

Transform and store a list of dictionaries containing dataframes to a kartothek dataset in store.

Parameters:
  • delayed_tasks (list of dask.delayed) – Every delayed object represents a partition and should be accepted by parse_input_to_metapartition()
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • dataset_uuid (str) – The dataset UUID
  • metadata (dict, optional) – A dictionary used to update the dataset metadata.
  • df_serializer (DataFrameSerializer, optional) – A pandas DataFrame serialiser from kartothek.serialization
  • overwrite (bool, optional) – If True, allow overwrite of an existing dataset.
  • metadata_merger (callable, optional) – By default partition metadata is combined using the combine_metadata() function. You can supply a callable here that implements a custom merge operation on the metadata dictionaries (depending on the matches this might be more than two values).
  • metadata_version (int, optional) – The dataset metadata version
  • partition_on (list) – Column names by which the dataset should be partitioned by physically. These columns may later on be used as an Index to improve query performance. Partition columns need to be present in all dataset tables. Sensitive to ordering.
  • metadata_storage_format (str) – Optional list of datastorage format to use. Currently supported is .json & .msgpack.zstd”
  • secondary_indices (List[str]) – A list of columns for which a secondary index should be calculated.
Returns:

Return type:

A dask.delayed dataset object.

kartothek.io.dask.delayed.update_dataset_from_delayed(delayed_tasks, store=None, dataset_uuid=None, delete_scope=None, metadata=None, df_serializer=None, metadata_merger=None, default_metadata_version=4, partition_on=None, sort_partitions_by=None, secondary_indices=None, factory=None)[source]

A dask.delayed graph to add and store a list of dictionaries containing dataframes to a kartothek dataset in store. The input should be a list (or splitter pipeline) containing MetaPartition. If you want to use this pipeline step for just deleting partitions without adding new ones you have to give an empty meta partition as input ([Metapartition(None)]).

Parameters:
  • delayed_tasks (list of dask.delayed) – Every delayed object represents a partition and should be accepted by parse_input_to_metapartition()
  • store (Callable or Str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • dataset_uuid (str) – The dataset UUID
  • delete_scope (list of dicts) – This defines which partitions are replaced with the input and therefore get deleted. It is a lists of query filters for the dataframe in the form of a dictionary, e.g.: [{‘column_1’: ‘value_1’}, {‘column_1’: ‘value_2’}]. Each query filter will be given to: func: `dataset.query and the returned partitions will be deleted. If no scope is given nothing will be deleted. For kartothek.io.dask.update.update_dataset.* a delayed object resolving to a list of dicts is also accepted.
  • metadata (dict, optional) – A dictionary used to update the dataset metadata.
  • df_serializer (DataFrameSerializer, optional) – A pandas DataFrame serialiser from kartothek.serialization
  • metadata_merger (callable, optional) – By default partition metadata is combined using the combine_metadata() function. You can supply a callable here that implements a custom merge operation on the metadata dictionaries (depending on the matches this might be more than two values).
  • default_metadata_version (int) – Default metadata version. (Note: Metadata version greater than 3 are only supported)
  • partition_on (list) – Column names by which the dataset should be partitioned by physically. These columns may later on be used as an Index to improve query performance. Partition columns need to be present in all dataset tables. Sensitive to ordering.
  • sort_partitions_by (str) – Provide a column after which the data should be sorted before storage to enable predicate pushdown.
  • secondary_indices (List[str]) – A list of columns for which a secondary index should be calculated.
  • factory (kartothek.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.