Source code for kartothek.io.dask.delayed

# -*- coding: utf-8 -*-
import warnings
from collections import defaultdict
from functools import partial
from typing import List, Optional

import dask
from dask import delayed
from dask.delayed import Delayed

from kartothek.core import naming
from kartothek.core.docs import default_docs
from kartothek.core.factory import _ensure_factory
from kartothek.core.naming import DEFAULT_METADATA_VERSION
from kartothek.core.typing import StoreInput
from kartothek.core.utils import lazy_store
from kartothek.core.uuid import gen_uuid
from kartothek.io_components.delete import (
    delete_common_metadata,
    delete_indices,
    delete_top_level_metadata,
)
from kartothek.io_components.gc import delete_files, dispatch_files_to_gc
from kartothek.io_components.merge import align_datasets
from kartothek.io_components.metapartition import (
    SINGLE_TABLE,
    MetaPartition,
    parse_input_to_metapartition,
)
from kartothek.io_components.read import dispatch_metapartitions_from_factory
from kartothek.io_components.update import update_dataset_from_partitions
from kartothek.io_components.utils import (
    _ensure_compatible_indices,
    normalize_arg,
    normalize_args,
    raise_if_indices_overlap,
    validate_partition_keys,
)
from kartothek.io_components.write import (
    raise_if_dataset_exists,
    store_dataset_from_partitions,
    write_partition,
)

from ...utils.migration_helpers import (
    deprecate_parameters,
    deprecate_parameters_if_set,
    get_deprecation_warning_remove_parameter_multi_table,
    get_parameter_default_value_deprecation_warning,
    get_specific_function_deprecation_warning,
    get_specific_function_deprecation_warning_multi_table,
)
from ._utils import (
    _cast_categorical_to_index_cat,
    _get_data,
    _identity,
    _maybe_get_categoricals_from_index,
    map_delayed,
)

__all__ = (
    "delete_dataset__delayed",
    "garbage_collect_dataset__delayed",
    "read_dataset_as_delayed",
    "update_dataset_from_delayed",
    "store_delayed_as_dataset",
)


def _delete_all_additional_metadata(dataset_factory):
    delete_indices(dataset_factory=dataset_factory)
    delete_common_metadata(dataset_factory=dataset_factory)


def _delete_tl_metadata(dataset_factory, *args):
    """
    This function serves as a collector function for delayed objects. Therefore
    allowing additional arguments which are not used.
    """
    delete_top_level_metadata(dataset_factory=dataset_factory)


[docs]@default_docs @normalize_args def delete_dataset__delayed(dataset_uuid=None, store=None, factory=None): """ Parameters ---------- """ dataset_factory = _ensure_factory( dataset_uuid=dataset_uuid, store=store, factory=factory, load_schema=False, load_dataset_metadata=False, ) gc = garbage_collect_dataset__delayed(factory=dataset_factory) mps = dispatch_metapartitions_from_factory(dataset_factory) delayed_dataset_uuid = delayed(_delete_all_additional_metadata)( dataset_factory=dataset_factory ) mps = map_delayed( MetaPartition.delete_from_store, mps, store=store, dataset_uuid=dataset_factory.dataset_uuid, ) return delayed(_delete_tl_metadata)(dataset_factory, mps, gc, delayed_dataset_uuid)
[docs]@default_docs @normalize_args def garbage_collect_dataset__delayed( dataset_uuid: Optional[str] = None, store: StoreInput = None, chunk_size: int = 100, factory=None, ) -> List[Delayed]: """ Remove auxiliary files that are no longer tracked by the dataset. These files include indices that are no longer referenced by the metadata as well as files in the directories of the tables that are no longer referenced. The latter is only applied to static datasets. Parameters ---------- chunk_size Number of files that should be deleted in a single job. """ ds_factory = _ensure_factory( dataset_uuid=dataset_uuid, store=store, factory=factory, load_dataset_metadata=False, ) nested_files = dispatch_files_to_gc( dataset_uuid=None, store_factory=None, chunk_size=chunk_size, factory=ds_factory ) return list( map_delayed(delete_files, nested_files, store_factory=ds_factory.store_factory) )
def _load_and_merge_mps(mp_list, store, label_merger, metadata_merger, merge_tasks): mp_list = [mp.load_dataframes(store=store) for mp in mp_list] mp = MetaPartition.merge_metapartitions( mp_list, label_merger=label_merger, metadata_merger=metadata_merger ) mp = mp.concat_dataframes() for task in merge_tasks: mp = mp.merge_dataframes(**task) return mp @default_docs @normalize_args def merge_datasets_as_delayed( left_dataset_uuid, right_dataset_uuid, store, merge_tasks, match_how="exact", label_merger=None, metadata_merger=None, ): """ A dask.delayed graph to perform the merge of two full kartothek datasets. Parameters ---------- left_dataset_uuid : str UUID for left dataset (order does not matter in all merge schemas) right_dataset_uuid : str UUID for right dataset (order does not matter in all merge schemas) match_how : Union[str, Callable] Define the partition label matching scheme. Available implementations are: * left (right) : The left (right) partitions are considered to be the base partitions and **all** partitions of the right (left) dataset are joined to the left partition. This should only be used if one of the datasets contain very few partitions. * prefix : The labels of the partitions of the dataset with fewer partitions are considered to be the prefixes to the right dataset * exact : All partition labels of the left dataset need to have an exact match in the right dataset * callable : A callable with signature func(left, right) which returns a boolean to determine if the partitions match If True, an exact match of partition labels between the to-be-merged datasets is required in order to merge. If False (Default), the partition labels of the dataset with fewer partitions are interpreted as prefixes. merge_tasks : List[Dict] A list of merge tasks. Each item in this list is a dictionary giving explicit instructions for a specific merge. Each dict should contain key/values: * `left`: The table for the left dataframe * `right`: The table for the right dataframe * 'output_label' : The table for the merged dataframe * `merge_func`: A callable with signature `merge_func(left_df, right_df, merge_kwargs)` to handle the data preprocessing and merging. Default pandas.merge * 'merge_kwargs' : The kwargs to be passed to the `merge_func` Example: .. code:: >>> merge_tasks = [ ... { ... "left": "left_dict", ... "right": "right_dict", ... "merge_kwargs": {"kwargs of merge_func": ''}, ... "output_label": 'merged_core_data' ... }, ... ] """ warnings.warn( message=get_specific_function_deprecation_warning( function_name="merge_datasets_as_delayed", deprecated_in="5.3", removed_in="6.0", ), category=DeprecationWarning, ) store = lazy_store(store) mps = align_datasets( left_dataset_uuid=left_dataset_uuid, right_dataset_uuid=right_dataset_uuid, store=store, match_how=match_how, ) mps = map_delayed( _load_and_merge_mps, mps, store=store, label_merger=label_merger, metadata_merger=metadata_merger, merge_tasks=merge_tasks, ) return list(mps) def _load_and_concat_metapartitions_inner(mps, args, kwargs): return MetaPartition.concat_metapartitions( [mp.load_dataframes(*args, **kwargs) for mp in mps] ) def _load_and_concat_metapartitions(list_of_mps, *args, **kwargs): return map_delayed( _load_and_concat_metapartitions_inner, list_of_mps, args=args, kwargs=kwargs ) @default_docs @normalize_args @deprecate_parameters( get_parameter_default_value_deprecation_warning( from_value="False", to_value="True", deprecated_in="5.3", changed_in="6.0" ), "dates_as_object", ) @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "tables", "concat_partitions_on_primary_index", "label_filter", "load_dataset_metadata", "dispatch_metadata", ) def read_dataset_as_delayed_metapartitions( dataset_uuid=None, store=None, tables=None, columns=None, concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, label_filter=None, dates_as_object=False, load_dataset_metadata=False, predicates=None, factory=None, dispatch_by=None, dispatch_metadata=True, ): """ A collection of dask.delayed objects to retrieve a dataset from store where each partition is loaded as a :class:`~kartothek.io_components.metapartition.MetaPartition`. .. seealso: :func:`~kartothek.io.dask.read_dataset_as_delayed` Parameters ---------- """ ds_factory = _ensure_factory( dataset_uuid=dataset_uuid, store=store, factory=factory, load_dataset_metadata=load_dataset_metadata, ) if len(ds_factory.tables) > 1: warnings.warn( "Trying to read a dataset with multiple internal tables. This functionality will be removed in the next " "major release. If you require a multi tabled data format, we recommend to switch to the kartothek Cube " "functionality. " "https://kartothek.readthedocs.io/en/stable/guide/cube/kartothek_cubes.html", DeprecationWarning, ) store = ds_factory.store_factory mps = dispatch_metapartitions_from_factory( dataset_factory=ds_factory, concat_partitions_on_primary_index=concat_partitions_on_primary_index, label_filter=label_filter, predicates=predicates, dispatch_by=dispatch_by, dispatch_metadata=dispatch_metadata, ) if concat_partitions_on_primary_index or dispatch_by is not None: mps = _load_and_concat_metapartitions( mps, store=store, tables=tables, columns=columns, categoricals=categoricals, predicate_pushdown_to_io=predicate_pushdown_to_io, dates_as_object=dates_as_object, predicates=predicates, ) else: mps = map_delayed( MetaPartition.load_dataframes, mps, store=store, tables=tables, columns=columns, categoricals=categoricals, predicate_pushdown_to_io=predicate_pushdown_to_io, dates_as_object=dates_as_object, predicates=predicates, ) categoricals_from_index = _maybe_get_categoricals_from_index( ds_factory, categoricals ) if categoricals_from_index: func_dict = defaultdict(_identity) func_dict.update( { table: partial(_cast_categorical_to_index_cat, categories=cats) for table, cats in categoricals_from_index.items() } ) mps = map_delayed( partial(MetaPartition.apply, func=func_dict, type_safe=True), mps ) return list(mps)
[docs]@default_docs @deprecate_parameters( get_parameter_default_value_deprecation_warning( from_value="False", to_value="True", deprecated_in="5.3", changed_in="6.0" ), "dates_as_object", ) @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "tables", "concat_partitions_on_primary_index", "label_filter", ) def read_dataset_as_delayed( dataset_uuid=None, store=None, tables=None, columns=None, concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, label_filter=None, dates_as_object=False, predicates=None, factory=None, dispatch_by=None, ): """ A collection of dask.delayed objects to retrieve a dataset from store where each partition is loaded as a :class:`~pandas.DataFrame`. Parameters ---------- """ mps = read_dataset_as_delayed_metapartitions( dataset_uuid=dataset_uuid, store=store, factory=factory, tables=tables, columns=columns, concat_partitions_on_primary_index=concat_partitions_on_primary_index, predicate_pushdown_to_io=predicate_pushdown_to_io, categoricals=categoricals, label_filter=label_filter, dates_as_object=dates_as_object, load_dataset_metadata=False, predicates=predicates, dispatch_by=dispatch_by, ) return list(map_delayed(_get_data, mps))
@default_docs @normalize_args def read_table_as_delayed( dataset_uuid=None, store=None, table=SINGLE_TABLE, columns=None, concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, label_filter=None, dates_as_object=False, predicates=None, factory=None, dispatch_by=None, ): """ A collection of dask.delayed objects to retrieve a single table from a dataset as partition-individual :class:`~pandas.DataFrame` instances. You can transform the collection of ``dask.delayed`` objects into a ``dask.dataframe`` using the following code snippet. As older kartothek specifications don't store schema information, this must be provided by a separate code path. .. code :: >>> import dask.dataframe as dd >>> ddf_tasks = read_table_as_delayed(…) >>> meta = … >>> ddf = dd.from_delayed(ddf_tasks, meta=meta) Parameters ---------- """ warnings.warn( message=get_specific_function_deprecation_warning_multi_table( function_name="read_table_as_delayed", deprecated_in="5.3", removed_in="6.0" ), category=DeprecationWarning, ) if not isinstance(columns, dict): columns = {table: columns} mps = read_dataset_as_delayed_metapartitions( dataset_uuid=dataset_uuid, store=store, tables=[table], columns=columns, concat_partitions_on_primary_index=concat_partitions_on_primary_index, predicate_pushdown_to_io=predicate_pushdown_to_io, categoricals=categoricals, label_filter=label_filter, dates_as_object=dates_as_object, load_dataset_metadata=False, predicates=predicates, factory=factory, dispatch_by=dispatch_by, dispatch_metadata=False, ) return list(map_delayed(partial(_get_data, table=table), mps))
[docs]@default_docs def update_dataset_from_delayed( delayed_tasks, store=None, dataset_uuid=None, delete_scope=None, metadata=None, df_serializer=None, metadata_merger=None, default_metadata_version=DEFAULT_METADATA_VERSION, partition_on=None, sort_partitions_by=None, secondary_indices=None, factory=None, ): """ A dask.delayed graph to add and store a list of dictionaries containing dataframes to a kartothek dataset in store. The input should be a list (or splitter pipeline) containing :class:`~kartothek.io_components.metapartition.MetaPartition`. If you want to use this pipeline step for just deleting partitions without adding new ones you have to give an empty meta partition as input (``[Metapartition(None)]``). Parameters ---------- See Also -------- :ref:`mutating_datasets` """ partition_on = normalize_arg("partition_on", partition_on) store = normalize_arg("store", store) secondary_indices = normalize_arg("secondary_indices", secondary_indices) delete_scope = dask.delayed(normalize_arg)("delete_scope", delete_scope) ds_factory, metadata_version, partition_on = validate_partition_keys( dataset_uuid=dataset_uuid, store=store, default_metadata_version=default_metadata_version, partition_on=partition_on, ds_factory=factory, ) secondary_indices = _ensure_compatible_indices(ds_factory, secondary_indices) mps = map_delayed( write_partition, delayed_tasks, secondary_indices=secondary_indices, metadata_version=metadata_version, partition_on=partition_on, store_factory=store, df_serializer=df_serializer, dataset_uuid=dataset_uuid, sort_partitions_by=sort_partitions_by, ) return dask.delayed(update_dataset_from_partitions)( mps, store_factory=store, dataset_uuid=dataset_uuid, ds_factory=ds_factory, delete_scope=delete_scope, metadata=metadata, metadata_merger=metadata_merger, )
[docs]@default_docs @normalize_args def store_delayed_as_dataset( delayed_tasks, store, dataset_uuid=None, metadata=None, df_serializer=None, overwrite=False, metadata_merger=None, metadata_version=naming.DEFAULT_METADATA_VERSION, partition_on=None, metadata_storage_format=naming.DEFAULT_METADATA_STORAGE_FORMAT, secondary_indices=None, ) -> Delayed: """ Transform and store a list of dictionaries containing dataframes to a kartothek dataset in store. Parameters ---------- """ store = lazy_store(store) if dataset_uuid is None: dataset_uuid = gen_uuid() if not overwrite: raise_if_dataset_exists(dataset_uuid=dataset_uuid, store=store) raise_if_indices_overlap(partition_on, secondary_indices) input_to_mps = partial( parse_input_to_metapartition, metadata_version=metadata_version ) mps = map_delayed(input_to_mps, delayed_tasks) if partition_on: mps = map_delayed(MetaPartition.partition_on, mps, partition_on=partition_on) if secondary_indices: mps = map_delayed(MetaPartition.build_indices, mps, columns=secondary_indices) mps = map_delayed( MetaPartition.store_dataframes, mps, store=store, df_serializer=df_serializer, dataset_uuid=dataset_uuid, ) return delayed(store_dataset_from_partitions)( mps, dataset_uuid=dataset_uuid, store=store, dataset_metadata=metadata, metadata_merger=metadata_merger, metadata_storage_format=metadata_storage_format, )