Source code for kartothek.io.iter

# -*- coding: utf-8 -*-

import warnings
from functools import partial
from typing import cast

from kartothek.core.docs import default_docs
from kartothek.core.factory import _ensure_factory
from kartothek.core.naming import (
    DEFAULT_METADATA_STORAGE_FORMAT,
    DEFAULT_METADATA_VERSION,
)
from kartothek.core.uuid import gen_uuid
from kartothek.io_components.metapartition import (
    MetaPartition,
    parse_input_to_metapartition,
)
from kartothek.io_components.read import dispatch_metapartitions_from_factory
from kartothek.io_components.update import update_dataset_from_partitions
from kartothek.io_components.utils import (
    _ensure_compatible_indices,
    normalize_args,
    raise_if_indices_overlap,
    sort_values_categorical,
    validate_partition_keys,
)
from kartothek.io_components.write import (
    raise_if_dataset_exists,
    store_dataset_from_partitions,
)
from kartothek.utils.migration_helpers import (
    DEPRECATION_WARNING_REMOVE_PARAMETER,
    deprecate_parameters,
    deprecate_parameters_if_set,
    get_deprecation_warning_remove_parameter_multi_table,
    get_parameter_default_value_deprecation_warning,
)

__all__ = (
    "read_dataset_as_dataframes__iterator",
    "update_dataset_from_dataframes__iter",
    "store_dataframes_as_dataset__iter",
)


@default_docs
@normalize_args
@deprecate_parameters(
    get_parameter_default_value_deprecation_warning(
        from_value="False", to_value="True", deprecated_in="5.3", changed_in="6.0"
    ),
    "dates_as_object",
)
@deprecate_parameters_if_set(
    get_deprecation_warning_remove_parameter_multi_table(
        deprecated_in="5.3", removed_in="6.0"
    ),
    "tables",
    "concat_partitions_on_primary_index",
    "label_filter",
    "load_dataset_metadata",
    "dispatch_metadata",
)
def read_dataset_as_metapartitions__iterator(
    dataset_uuid=None,
    store=None,
    tables=None,
    columns=None,
    concat_partitions_on_primary_index=False,
    predicate_pushdown_to_io=True,
    categoricals=None,
    label_filter=None,
    dates_as_object=False,
    load_dataset_metadata=False,
    predicates=None,
    factory=None,
    dispatch_by=None,
    dispatch_metadata=True,
):
    """

    A Python iterator to retrieve a dataset from store where each
    partition is loaded as a :class:`~kartothek.io_components.metapartition.MetaPartition`.

    .. seealso:

        :func:`~kartothek.io_components.read.read_dataset_as_dataframes__iterator`

    Parameters
    ----------

    """

    ds_factory = _ensure_factory(
        dataset_uuid=dataset_uuid,
        store=store,
        factory=factory,
        load_dataset_metadata=load_dataset_metadata,
    )

    if len(ds_factory.tables) > 1:
        warnings.warn(
            "Trying to read a dataset with multiple internal tables. This functionality will be removed in the next "
            "major release. If you require a multi tabled data format, we recommend to switch to the kartothek Cube "
            "functionality. "
            "https://kartothek.readthedocs.io/en/stable/guide/cube/kartothek_cubes.html",
            DeprecationWarning,
        )

    store = ds_factory.store
    mps = dispatch_metapartitions_from_factory(
        ds_factory,
        concat_partitions_on_primary_index=concat_partitions_on_primary_index,
        label_filter=label_filter,
        predicates=predicates,
        dispatch_by=dispatch_by,
        dispatch_metadata=dispatch_metadata,
    )

    for mp in mps:
        if concat_partitions_on_primary_index or dispatch_by is not None:
            mp = MetaPartition.concat_metapartitions(
                [
                    mp_inner.load_dataframes(
                        store=store,
                        tables=tables,
                        columns=columns,
                        categoricals=categoricals,
                        predicate_pushdown_to_io=predicate_pushdown_to_io,
                        predicates=predicates,
                    )
                    for mp_inner in mp
                ]
            )
        else:
            mp = cast(MetaPartition, mp)
            mp = mp.load_dataframes(
                store=store,
                tables=tables,
                columns=columns,
                categoricals=categoricals,
                predicate_pushdown_to_io=predicate_pushdown_to_io,
                dates_as_object=dates_as_object,
                predicates=predicates,
            )
        yield mp


[docs]@default_docs @normalize_args @deprecate_parameters( get_parameter_default_value_deprecation_warning( from_value="False", to_value="True", deprecated_in="5.3", changed_in="6.0" ), "dates_as_object", ) @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "tables", "concat_partitions_on_primary_index", "label_filter", ) def read_dataset_as_dataframes__iterator( dataset_uuid=None, store=None, tables=None, columns=None, concat_partitions_on_primary_index=False, predicate_pushdown_to_io=True, categoricals=None, label_filter=None, dates_as_object=False, predicates=None, factory=None, dispatch_by=None, ): """ A Python iterator to retrieve a dataset from store where each partition is loaded as a :class:`~pandas.DataFrame`. Parameters ---------- Returns ------- list A list containing a dictionary for each partition. The dictionaries keys are the in-partition file labels and the values are the corresponding dataframes. Examples -------- Dataset in store contains two partitions with two files each .. code :: >>> import storefact >>> from kartothek.io.iter import read_dataset_as_dataframes__iterator >>> store = storefact.get_store_from_url('s3://bucket_with_dataset') >>> dataframes = read_dataset_as_dataframes__iterator('dataset_uuid', store) >>> next(dataframes) [ # First partition {'core': pd.DataFrame, 'lookup': pd.DataFrame} ] >>> next(dataframes) [ # Second partition {'core': pd.DataFrame, 'lookup': pd.DataFrame} ] """ mp_iter = read_dataset_as_metapartitions__iterator( dataset_uuid=dataset_uuid, store=store, tables=tables, columns=columns, concat_partitions_on_primary_index=concat_partitions_on_primary_index, predicate_pushdown_to_io=predicate_pushdown_to_io, categoricals=categoricals, label_filter=label_filter, dates_as_object=dates_as_object, load_dataset_metadata=False, predicates=predicates, factory=factory, dispatch_by=dispatch_by, dispatch_metadata=False, ) for mp in mp_iter: yield mp.data
[docs]@default_docs @normalize_args @deprecate_parameters_if_set( DEPRECATION_WARNING_REMOVE_PARAMETER, "central_partition_metadata", "load_dynamic_metadata", ) def update_dataset_from_dataframes__iter( df_generator, store=None, dataset_uuid=None, delete_scope=None, metadata=None, df_serializer=None, metadata_merger=None, central_partition_metadata=True, default_metadata_version=DEFAULT_METADATA_VERSION, partition_on=None, load_dynamic_metadata=True, sort_partitions_by=None, secondary_indices=None, factory=None, ): """ Update a kartothek dataset in store iteratively, using a generator of dataframes. Useful for datasets which do not fit into memory. Parameters ---------- Returns ------- The dataset metadata object (:class:`~kartothek.core.dataset.DatasetMetadata`). See Also -------- :ref:`mutating_datasets` """ ds_factory, metadata_version, partition_on = validate_partition_keys( dataset_uuid=dataset_uuid, store=store, ds_factory=factory, default_metadata_version=default_metadata_version, partition_on=partition_on, ) secondary_indices = _ensure_compatible_indices(ds_factory, secondary_indices) if sort_partitions_by: # Define function which sorts each partition by column sort_partitions_by_fn = partial( sort_values_categorical, columns=sort_partitions_by ) new_partitions = [] for df in df_generator: mp = parse_input_to_metapartition( df, metadata_version=metadata_version, expected_secondary_indices=secondary_indices, ) if sort_partitions_by: mp = mp.apply(sort_partitions_by_fn) if partition_on: mp = mp.partition_on(partition_on=partition_on) if secondary_indices: mp = mp.build_indices(columns=secondary_indices) # Store dataframe, thereby clearing up the dataframe from the `mp` metapartition mp = mp.store_dataframes( store=store, df_serializer=df_serializer, dataset_uuid=dataset_uuid ) new_partitions.append(mp) return update_dataset_from_partitions( new_partitions, store_factory=store, dataset_uuid=dataset_uuid, ds_factory=ds_factory, delete_scope=delete_scope, metadata=metadata, metadata_merger=metadata_merger, )
[docs]@default_docs @normalize_args def store_dataframes_as_dataset__iter( df_generator, store, dataset_uuid=None, metadata=None, partition_on=None, df_serializer=None, overwrite=False, metadata_storage_format=DEFAULT_METADATA_STORAGE_FORMAT, metadata_version=DEFAULT_METADATA_VERSION, secondary_indices=None, ): """ Store `pd.DataFrame` s iteratively as a partitioned dataset with multiple tables (files). Useful for datasets which do not fit into memory. Parameters ---------- Returns ------- dataset: kartothek.core.dataset.DatasetMetadata The stored dataset. """ if dataset_uuid is None: dataset_uuid = gen_uuid() if not overwrite: raise_if_dataset_exists(dataset_uuid=dataset_uuid, store=store) raise_if_indices_overlap(partition_on, secondary_indices) new_partitions = [] for df in df_generator: mp = parse_input_to_metapartition(df, metadata_version=metadata_version) if partition_on: mp = mp.partition_on(partition_on) if secondary_indices: mp = mp.build_indices(secondary_indices) # Store dataframe, thereby clearing up the dataframe from the `mp` metapartition mp = mp.store_dataframes( store=store, dataset_uuid=dataset_uuid, df_serializer=df_serializer ) # Add `kartothek.io_components.metapartition.MetaPartition` object to list to track partitions new_partitions.append(mp) # Store metadata and return `kartothek.DatasetMetadata` object return store_dataset_from_partitions( partition_list=new_partitions, dataset_uuid=dataset_uuid, store=store, dataset_metadata=metadata, metadata_storage_format=metadata_storage_format, )