kartothek.io_components.metapartition module

class kartothek.io_components.metapartition.MetaPartition(label: Optional[str], files: Optional[Dict[str, str]] = None, metadata: Any = None, data: Optional[Dict[str, pandas.core.frame.DataFrame]] = None, dataset_metadata: Optional[Dict] = None, indices: Optional[Dict[Any, Any]] = None, metadata_version: Optional[int] = None, table_meta: Optional[Dict[str, kartothek.core.common_metadata.SchemaWrapper]] = None, partition_keys: Optional[Sequence[str]] = None, logical_conjunction: Optional[List[Tuple[Any, str, Any]]] = None)[source]

Bases: Iterable

Wrapper for kartothek partition which includes additional information about the parent dataset

add_metapartition(metapartition: kartothek.io_components.metapartition.MetaPartition, metadata_merger: Optional[Callable] = None, schema_validation: bool = True)[source]

Adds a metapartition to the internal list structure to enable batch processing.

The top level dataset_metadata dictionary is combined with the existing dict and all other attributes are stored in the metapartitions list

Parameters
  • metapartition – The MetaPartition to be added.

  • metadata_merger – A callable to perform the metadata merge. By default [kartothek.io_components.utils.combine_metadata] is used

  • schema_validation – If True (default), ensure that the table_meta of both MetaPartition objects are the same

apply(func: Union[Callable, Dict[str, Callable]], tables: Optional[List[str]] = None, metadata: Optional[Dict] = None, type_safe: bool = False)kartothek.io_components.metapartition.MetaPartition[source]

Applies a given function to all dataframes of the MetaPartition.

Parameters
  • func – A callable accepting and returning a pandas.DataFrame

  • tables – Only apply and return the function on the given tables. Note: behavior will change in future versions! New behavior will be: Only apply the provided function to the given tables

  • uuid – The changed dataset is assigned a new UUID.

  • type_safe – If the transformation is type-safe, optimizations can be applied

as_sentinel()[source]
build_indices(columns: Iterable[str])[source]

This builds the indices for this metapartition for the given columns. The indices for the passed columns are rebuilt, so exisiting index entries in the metapartition are overwritten.

Parameters

columns – A list of columns from which the indices over all dataframes in the metapartition are overwritten

Returns

self

concat_dataframes()[source]

Concatenates all dataframes with identical columns.

In case of changes on the dataframes, the files attribute will be emptied since the in-memory DataFrames are no longer representations of the referenced files.

Returns

A metapartition where common column dataframes are merged. The file attribute will be empty since there is no direct relation between the referenced files and the in-memory dataframes anymore

Return type

MetaPartition

static concat_metapartitions(metapartitions, label_merger=None, metadata_merger=None)[source]
copy(**kwargs)[source]

Creates a shallow copy where the kwargs overwrite existing attributes

property data
delete_from_store(dataset_uuid: Any, store: Union[str, simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]])kartothek.io_components.metapartition.MetaPartition[source]
property files
static from_dict(dct)[source]

Create a MetaPartition from a dictionary.

Parameters

dct (dict) – Dictionary containing constructor arguments as keys

static from_partition(partition: kartothek.core.partition.Partition, data: Optional[Dict] = None, dataset_metadata: Dict = None, indices: Dict = None, metadata_version: Optional[int] = None, table_meta: Optional[Dict] = None, partition_keys: Optional[List[str]] = None, logical_conjunction: Optional[List[Tuple[Any, str, Any]]] = None)[source]

Transform a kartothek Partition into a MetaPartition.

Parameters
  • partition – The kartothek partition to be wrapped

  • data – A dictionaries with materialised DataFrame

  • dataset_metadata – The metadata of the original dataset

  • indices – The index dictionary of the dataset

  • table_meta – Type metadata for each table, optional

  • metadata_version

  • partition_keys – A list of the primary partition keys

Returns

Return type

MetaPartition

get_parquet_metadata(store: Union[str, simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], table_name: str)pandas.core.frame.DataFrame[source]

Retrieve the parquet metadata for the MetaPartition. Especially relevant for calculating dataset statistics.

Parameters
  • store – A factory function providing a KeyValueStore

  • table_name – Name of the kartothek table for which the statistics should be retrieved

Returns

A DataFrame with relevant parquet metadata

Return type

pd.DataFrame

property indices
property is_sentinel
property label
load_all_table_meta(store: Union[str, simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], dataset_uuid: str)kartothek.io_components.metapartition.MetaPartition[source]

Loads all table metadata in memory and stores it under the tables attribute

load_dataframes(store: Union[str, simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], tables: Dict[str, Iterable[str]] = None, columns: Dict[str, Iterable[str]] = None, predicate_pushdown_to_io: bool = True, categoricals: Dict[str, Iterable[str]] = None, dates_as_object: bool = False, predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]] = None)kartothek.io_components.metapartition.MetaPartition[source]

Load the dataframes of the partitions from store into memory.

Parameters
  • store (Callable or str or simplekv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either simplekv.KeyValueStore, a storefact store url or a generic Callable producing a simplekv.KeyValueStore

  • tables (List[str]) – A list of tables to be loaded. If None is given, all tables of a partition are loaded

  • columns (Optional[List[Dict[str]]]) – A dictionary mapping tables to list of columns. Only the specified columns are loaded for the corresponding table. If a specfied table or column is not present in the dataset, a ValueError is raised.

  • predicate_pushdown_to_io (bool) – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the storage layer that need to be addressed there.

  • categoricals (Dict[str, List[str]]) – A dictionary mapping tables to list of columns that should be loaded as category dtype instead of the inferred one.

  • dates_as_object (bool) – Load pyarrow.date{32,64} columns as object columns in Pandas instead of using np.datetime64 to preserve their type. While this improves type-safety, this comes at a performance cost.

  • predicates (List[List[Tuple[str, str, Any]]) –

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

    Available operators are: ==, !=, <=, >=, <, > and in.

    Filtering for missings is supported with operators ==, != and in and values np.nan and None for float and string columns respectively.

    Categorical data

    When using order sensitive operators on categorical data we will assume that the categories obey a lexicographical ordering. This filtering may result in less than optimal performance and may be slower than the evaluation on non-categorical data.

    See also Filtering / Predicate pushdown and Efficient Querying

  • tables

    If a list is supplied, only the given tables of the partition are loaded. If the given table does not exist it is ignored.

    Examples

    >>> part = MetaPartition(
    ...     label='part_label'
    ...     files={
    ...         'core': 'core_key_in_store',
    ...         'helper': 'helper_key_in_store'
    ...     }
    ...  )
    >>> part.data
        {}
    >>> part = part.load_dataframes(store, ['core'])
    >>> part.data
        {
            'core': pd.DataFrame()
        }
    

merge_dataframes(left: str, right: str, output_label: str, merge_func: Callable = <function merge>, merge_kwargs: Optional[Dict] = None)[source]

Merge internal dataframes.

The two referenced dataframes are removed from the internal list and the newly created dataframe is added.

The merge itself can be completely customized by supplying a callable merge_func(left_df, right_df, **merge_kwargs) which can handle data pre-processing as well as the merge itself.

The files attribute of the result will be empty since the in-memory DataFrames are no longer representations of the referenced files.

Parameters
  • left – Category of the left dataframe.

  • right – Category of the right dataframe.

  • output_label – Category for the newly created dataframe

  • merge_func – The function to take care of the merge. By default: pandas.merge. The function should have the signature func(left_df, right_df, **kwargs)

  • merge_kwargs – Keyword arguments which should be supplied to the merge function

Returns

Return type

MetaPartition

static merge_indices(metapartitions)[source]
static merge_metapartitions(metapartitions, label_merger=None, metadata_merger=None)[source]
property partition
partition_on(partition_on: Union[str, Sequence[str]])[source]

Partition all dataframes assigned to this MetaPartition according the the given columns.

If the MetaPartition object contains index information, the information is split in such a way that they reference the new partitions.

In case a requested partition column is not existent in all tables, a KeyError is raised.

All output partitions are re-assigned labels encoding the partitioned columns (urlencoded)

Examples:

>>> import pandas as pd
>>> from kartothek.io_components.metapartition import MetaPartition
>>> mp = MetaPartition(
...     label='partition_label',
...     data={
...         "Table1": pd.DataFrame({
...             'P': [1, 2, 1, 2],
...             'L': [1, 1, 2, 2]
...         })
...     }
... )
>>> repartitioned_mp = mp.partition_on(['P', 'L'])
>>> assert [mp["label"] for mp in repartitioned_mp.metapartitions] == [
...     "P=1/L=1/partition_label",
...     "P=1/L=2/partition_label",
...     "P=2/L=1/partition_label",
...     "P=2/L=2/partition_label"
... ]
Parameters

partition_on

remove_dataframes()[source]

Remove all dataframes from the metapartition in memory.

store_dataframes(store: Union[str, simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], dataset_uuid: str, df_serializer: Optional[kartothek.serialization._generic.DataFrameSerializer] = None, store_metadata: bool = False, metadata_storage_format: Optional[str] = None)kartothek.io_components.metapartition.MetaPartition[source]

Stores all dataframes of the MetaPartitions and registers the saved files under the files atrribute. The dataframe itself is deleted from memory.

Parameters
  • store – If it is a function, the result of calling it must be a KeyValueStore.

  • dataset_uuid – The dataset UUID the partition will be assigned to

  • df_serializer – Serialiser to be used to store the dataframe

Returns

Return type

MetaPartition

property tables
to_dict()[source]
validate_schema_compatible(store: Union[str, simplekv.KeyValueStore, Callable[], simplekv.KeyValueStore]], dataset_uuid: str)kartothek.io_components.metapartition.MetaPartition[source]

Validates that the currently held DataFrames match the schema of the existing dataset.

Parameters
  • store – If it is a function, the result of calling it must be a KeyValueStore.

  • dataset_uuid – The dataset UUID the partition will be assigned to

class kartothek.io_components.metapartition.MetaPartitionIterator(metapartition)[source]

Bases: Iterator

next()

Return the next item from the iterator. When exhausted, raise StopIteration

kartothek.io_components.metapartition.parse_input_to_metapartition(obj: Union[Dict, pandas.core.frame.DataFrame, Sequence, kartothek.io_components.metapartition.MetaPartition], metadata_version: Optional[int] = None, expected_secondary_indices: Optional[Union[Literal[False], List[str]]] = False)kartothek.io_components.metapartition.MetaPartition[source]

Parses given user input and returns a MetaPartition

The format specification supports multiple input modes as following:

  1. Mode - Dictionary with partition information

    In this case, a dictionary is supplied where the keys describe the partition.

    • label - (optional) Unique partition label. If None is given, a UUID is generated using kartothek.core.uuid.gen_uuid().

    • data - A dict or list of tuples. The keys represent the table name and the values are the actual payload data as a pandas.DataFrame.

    • indices - Deprecated, see the keyword argument secondary_indices to create indices.

      A dictionary to describe the dataset indices. All partition level indices are finally merged using kartothek.io_components.metapartition.MetaPartition.merge_indices() into a single dataset index

    Examples:

    # A partition with explicit label, no metadata, one table and index information
    input_obj = {
        'label': 'partition_label',
        'data': [('table', pd.DataFrame([{'column_1':values_1, 'column_2':values_2}]))],
        'indices': {
            "column_1": {
                value: ['partition_label']
            }
        }
    }
    # If no label is given, a UUID will be generated using :func:`kartothek.core.uuid.gen_uuid`
    simple_input = {
        'data': [('table', pd.DataFrame())],
    }
    
  2. Mode - pandas.DataFrame

    If only a DataFrame is provided, a UUID is generated and the dataframe is stored for the table name SINGLE_TABLE

  3. Mode - MetaPartition

    If a MetaPartition is passed directly, it is simply passed through.

  4. Mode - List of tuples

    The first item represents the table name and the second is the actual payload data as a pandas.DataFrame.

    Example:

    # A partition with no explicit label, no metadata and one table
    input_obj = [('table', pd.DataFrame())]
    

Nested MetaPartitions:

The input may also be provided as a list to ease batch processing. The returned MetaPartition will be nested and each list element represents a single physical partition. For details on nested MetaPartitions, see MetaPartition

Parameters
  • obj

  • metadata_version – The kartothek dataset specification version

  • expected_secondary_indices – Iterable of strings containing expected columns on which indices are created. An empty iterable indicates no indices are expected. The default is False, which, indicates no checking will be done (None behaves the same way). This is only used in mode “Dictionary with partition information”.

Raises

ValueError – In case the given input is not understood

Returns

Return type

MetaPartition

kartothek.io_components.metapartition.partition_labels_from_mps(mps: List[kartothek.io_components.metapartition.MetaPartition])List[str][source]

Get a list of partition labels, flattening any nested meta partitions in the input and ignoring sentinels.