Source code for kartothek.io_components.metapartition

import inspect
import io
import logging
import os
import time
import warnings
from collections import defaultdict, namedtuple
from copy import copy
from functools import wraps
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
    cast,
)

import numpy as np
import pandas as pd
import pyarrow as pa

from kartothek.core import naming
from kartothek.core.common_metadata import (
    SchemaWrapper,
    make_meta,
    normalize_column_order,
    read_schema_metadata,
    validate_compatible,
    validate_shared_columns,
)
from kartothek.core.docs import default_docs
from kartothek.core.index import ExplicitSecondaryIndex, IndexBase
from kartothek.core.index import merge_indices as merge_indices_algo
from kartothek.core.naming import get_partition_file_prefix
from kartothek.core.partition import Partition
from kartothek.core.typing import StoreInput
from kartothek.core.urlencode import decode_key, quote_indices
from kartothek.core.utils import (
    ensure_store,
    ensure_string_type,
    verify_metadata_version,
)
from kartothek.core.uuid import gen_uuid
from kartothek.io_components.utils import (
    InferredIndices,
    _ensure_valid_indices,
    align_categories,
    combine_metadata,
)
from kartothek.serialization import (
    DataFrameSerializer,
    PredicatesType,
    default_serializer,
    filter_df_from_predicates,
)
from kartothek.utils.migration_helpers import (
    DEPRECATION_WARNING_REMOVE_PARAMETER,
    deprecate_parameters,
    deprecate_parameters_if_set,
    get_deprecation_warning_remove_dict_multi_table,
    get_deprecation_warning_remove_parameter_multi_table,
    get_parameter_default_value_deprecation_warning,
    get_parameter_generic_replacement_deprecation_warning,
    get_parameter_type_change_deprecation_warning,
    get_specific_function_deprecation_warning_multi_table,
)

LOGGER = logging.getLogger(__name__)

SINGLE_TABLE = "table"

_Literal = namedtuple("_Literal", ["column", "op", "value"])
_SplitPredicate = namedtuple("_SplitPredicate", ["key_part", "content_part"])

_METADATA_SCHEMA = {
    "partition_label": np.dtype("O"),
    "row_group_id": np.dtype(int),
    "row_group_compressed_size": np.dtype(int),
    "row_group_uncompressed_size": np.dtype(int),
    "number_rows_total": np.dtype(int),
    "number_row_groups": np.dtype(int),
    "serialized_size": np.dtype(int),
    "number_rows_per_row_group": np.dtype(int),
}

_MULTI_TABLE_DICT_LIST = Dict[str, Iterable[str]]
MetaPartitionInput = Union[Dict, pd.DataFrame, Sequence, "MetaPartition"]


def _predicates_to_named(predicates):
    if predicates is None:
        return None
    return [[_Literal(*x) for x in conjunction] for conjunction in predicates]


def _combine_predicates(predicates, logical_conjunction):
    if not logical_conjunction:
        return predicates
    if predicates is None:
        return [logical_conjunction]
    combined_predicates = []
    for conjunction in predicates:
        new_conjunction = conjunction[:]
        for literal in logical_conjunction:
            new_conjunction.append(literal)
        combined_predicates.append(new_conjunction)
    return combined_predicates


def _initialize_store_for_metapartition(method, method_args, method_kwargs):

    for store_variable in ["store", "storage"]:
        if store_variable in method_kwargs:
            method_kwargs[store_variable] = ensure_store(method_kwargs[store_variable])
        else:
            method = cast(object, method)
            args = inspect.getfullargspec(method).args

            if store_variable in args:
                ix = args.index(store_variable)
                # reduce index since the argspec and method_args start counting differently due to self
                ix -= 1
                instantiated_store = ensure_store(method_args[ix])
                new_args = []
                for ix_method, arg in enumerate(method_args):
                    if ix_method != ix:
                        new_args.append(arg)
                    else:
                        new_args.append(instantiated_store)
                method_args = tuple(new_args)

    return method_args, method_kwargs


def _apply_to_list(method):
    """
    Decorate a MetaPartition method to act upon the internal list of metapartitions

    The methods must return a MetaPartition object!
    """

    @wraps(method)
    def _impl(self, *method_args, **method_kwargs):
        if not isinstance(self, MetaPartition):
            raise TypeError("Type unknown %s", type(self))

        result = self.as_sentinel()
        if len(self) == 0:
            raise RuntimeError("Invalid MetaPartition. No sub-partitions to act upon.")

        # Look whether there is a `store` in the arguments and instatiate it
        # this way we avoid multiple HTTP pools
        method_args, method_kwargs = _initialize_store_for_metapartition(
            method, method_args, method_kwargs
        )
        if (len(self) == 1) and (self.label is None):
            result = method(self, *method_args, **method_kwargs)
        else:
            for mp in self:
                method_return = method(mp, *method_args, **method_kwargs)
                if not isinstance(method_return, MetaPartition):
                    raise ValueError(
                        "Method {} did not return a MetaPartition "
                        "but {}".format(method.__name__, type(method_return))
                    )
                if method_return.is_sentinel:
                    result = method_return
                else:
                    for mp in method_return:
                        result = result.add_metapartition(mp, schema_validation=False)
        if not isinstance(result, MetaPartition):
            raise ValueError(
                "Result for method {} is not a `MetaPartition` but {}".format(
                    method.__name__, type(method_return)
                )
            )
        return result

    return _impl


[docs]class MetaPartitionIterator(Iterator): def __init__(self, metapartition): self.metapartition = metapartition self.position = 0 def __iter__(self): return self def __next__(self): current = self.metapartition if len(current) == 1: if current.label is None: raise StopIteration() if self.position >= len(current.metapartitions): raise StopIteration() else: mp_dict = current.metapartitions[self.position] # These are global attributes, i.e. the nested metapartitions do not carry these and need # to be added here mp_dict["dataset_metadata"] = current.dataset_metadata mp_dict["metadata_version"] = current.metadata_version mp_dict["table_meta"] = current.table_meta mp_dict["partition_keys"] = current.partition_keys mp_dict["logical_conjunction"] = current.logical_conjunction self.position += 1 return MetaPartition.from_dict(mp_dict) next = __next__ # Python 2
[docs]class MetaPartition(Iterable): """ Wrapper for kartothek partition which includes additional information about the parent dataset """ @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "dataset_metadata", "metadata", ) @deprecate_parameters_if_set( get_parameter_generic_replacement_deprecation_warning( replacing_parameter="schema", deprecated_in="5.3", changed_in="6.0" ), "table_meta", ) @deprecate_parameters_if_set( get_parameter_generic_replacement_deprecation_warning( replacing_parameter="file", deprecated_in="5.3", changed_in="6.0" ), "files", ) @deprecate_parameters_if_set( get_deprecation_warning_remove_dict_multi_table( deprecated_in="5.3", changed_in="6.0" ), "data", ) def __init__( self, label: Optional[str], files: Optional[Dict[str, str]] = None, metadata: Any = None, data: Optional[Dict[str, pd.DataFrame]] = None, dataset_metadata: Optional[Dict] = None, indices: Optional[Dict[Any, Any]] = None, metadata_version: Optional[int] = None, table_meta: Optional[Dict[str, SchemaWrapper]] = None, partition_keys: Optional[Sequence[str]] = None, logical_conjunction: Optional[List[Tuple[Any, str, Any]]] = None, ): """ Initialize the :mod:`kartothek.io` base class MetaPartition. The `MetaPartition` is used as a wrapper around the kartothek `Partition` and primarily deals with dataframe manipulations, in- and output to store. The :class:`kartothek.io_components.metapartition` is immutable, i.e. all member functions will return a new MetaPartition object where the new attribute is changed Parameters ---------- label partition label files A dictionary with references to the files in store where the keys represent file labels and the keys file prefixes. metadata The metadata of the partition data A dictionary including the materialized in-memory DataFrames corresponding to the file references in `files`. dataset_metadata The metadata of the original dataset indices Kartothek index dictionary, metadata_version table_meta The dataset table schemas partition_keys The dataset partition keys logical_conjunction A logical conjunction to assign to the MetaPartition. By assigning this, the MetaPartition will only be able to load data respecting this conjunction. """ if metadata_version is None: self.metadata_version = naming.DEFAULT_METADATA_VERSION else: self.metadata_version = metadata_version verify_metadata_version(self.metadata_version) self.table_meta = table_meta if table_meta else {} if isinstance(data, dict) and (len(self.table_meta) == 0): for table, df in data.items(): if df is not None: self.table_meta[table] = make_meta( df, origin="{}/{}".format(table, label), partition_keys=partition_keys, ) indices = indices or {} for column, index_dct in indices.items(): if isinstance(index_dct, dict): indices[column] = ExplicitSecondaryIndex( column=column, index_dct=index_dct ) self.logical_conjunction = logical_conjunction self.metapartitions = [ { "label": label, "data": data or {}, "files": files or {}, "indices": indices, "logical_conjunction": logical_conjunction, } ] self.dataset_metadata = dataset_metadata or {} self.partition_keys = partition_keys or [] def __repr__(self): if len(self.metapartitions) > 1: label = "NESTED ({})".format(len(self.metapartitions)) else: label = self.label return "<{_class} v{version} | {label} | tables {tables} >".format( version=self.metadata_version, _class=self.__class__.__name__, label=label, tables=sorted(set(self.table_meta.keys())), ) def __len__(self): return len(self.metapartitions) def __iter__(self): return MetaPartitionIterator(self) def __getitem__(self, label): for mp in self: if mp.label == label: return mp raise KeyError("Metapartition doesn't contain partition `{}`".format(label)) @property def data(self): if len(self.metapartitions) > 1: raise AttributeError( "Accessing `data` attribute is not allowed while nested" ) assert isinstance(self.metapartitions[0], dict), self.metapartitions return self.metapartitions[0]["data"] @property def files(self): if len(self.metapartitions) > 1: raise AttributeError( "Accessing `files` attribute is not allowed while nested" ) return self.metapartitions[0]["files"] @property def is_sentinel(self): return len(self.metapartitions) == 1 and self.label is None @property def label(self): if len(self.metapartitions) > 1: raise AttributeError( "Accessing `label` attribute is not allowed while nested" ) assert isinstance(self.metapartitions[0], dict), self.metapartitions[0] return self.metapartitions[0]["label"] @property def indices(self): if len(self.metapartitions) > 1: raise AttributeError( "Accessing `indices` attribute is not allowed while nested" ) return self.metapartitions[0]["indices"] @property def tables(self): return list(set(self.data.keys()).union(set(self.files.keys()))) @property def partition(self): return Partition(label=self.label, files=self.files) def __eq__(self, other): if not isinstance(other, MetaPartition): return False if self.metadata_version != other.metadata_version: return False for table, meta in self.table_meta.items(): # https://issues.apache.org/jira/browse/ARROW-5873 other_meta = other.table_meta.get(table, None) if other_meta is None: return False if not meta.equals(other_meta): return False if self.dataset_metadata != other.dataset_metadata: return False if len(self.metapartitions) != len(other.metapartitions): return False # In the case both MetaPartitions are nested, we need to ensure a match # for all sub-partitions. # Since the label is unique, this can be used as a distinguishing key to sort and compare # the nested metapartitions. if len(self.metapartitions) > 1: for mp_self, mp_other in zip( sorted(self.metapartitions, key=lambda x: x["label"]), sorted(other.metapartitions, key=lambda x: x["label"]), ): if mp_self == mp_other: continue # If a single metapartition does not match, the whole object is considered different return False return True # This is unnested only self_keys = set(self.data.keys()) other_keys = set(other.data.keys()) if not (self_keys == other_keys): return False if self.label != other.label: return False if self.files != other.files: return False for label, df in self.data.items(): if not (df.equals(other.data[label])): return False return True
[docs] @staticmethod @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "dataset_metadata", ) @deprecate_parameters_if_set( get_parameter_generic_replacement_deprecation_warning( replacing_parameter="schema", deprecated_in="5.3", changed_in="6.0" ), "table_meta", ) @deprecate_parameters_if_set( get_deprecation_warning_remove_dict_multi_table( deprecated_in="5.3", changed_in="6.0" ), "data", ) def from_partition( partition: Partition, data: Optional[Dict] = None, dataset_metadata: Dict = None, indices: Dict = None, metadata_version: Optional[int] = None, table_meta: Optional[Dict] = None, partition_keys: Optional[List[str]] = None, logical_conjunction: Optional[List[Tuple[Any, str, Any]]] = None, ): """ Transform a kartothek :class:`~kartothek.core.partition.Partition` into a :class:`~kartothek.io_components.metapartition.MetaPartition`. Parameters ---------- partition The kartothek partition to be wrapped data A dictionaries with materialised :class:`~pandas.DataFrame` dataset_metadata The metadata of the original dataset indices The index dictionary of the dataset table_meta Type metadata for each table, optional metadata_version partition_keys A list of the primary partition keys Returns ------- :class:`~kartothek.io_components.metapartition.MetaPartition` """ return MetaPartition( label=partition.label, files=partition.files, data=data, dataset_metadata=dataset_metadata, indices=indices, metadata_version=metadata_version, table_meta=table_meta, partition_keys=partition_keys, logical_conjunction=logical_conjunction, )
[docs] @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "metadata_merger", ) def add_metapartition( self, metapartition: "MetaPartition", metadata_merger: Optional[Callable] = None, schema_validation: bool = True, ): """ Adds a metapartition to the internal list structure to enable batch processing. The top level `dataset_metadata` dictionary is combined with the existing dict and all other attributes are stored in the `metapartitions` list Parameters ---------- metapartition The MetaPartition to be added. metadata_merger A callable to perform the metadata merge. By default [kartothek.io_components.utils.combine_metadata] is used schema_validation If True (default), ensure that the `table_meta` of both `MetaPartition` objects are the same """ if self.is_sentinel: return metapartition table_meta = metapartition.table_meta existing_label = [mp_["label"] for mp_ in self.metapartitions] if any( [mp_["label"] in existing_label for mp_ in metapartition.metapartitions] ): raise RuntimeError( "Duplicate labels for nested metapartitions are not allowed!" ) if schema_validation: table_meta = {} for table, meta in self.table_meta.items(): other = metapartition.table_meta.get(table, None) # This ensures that only schema-compatible metapartitions can be nested # The returned schema by validate_compatible is the reference schema with the most # information, i.e. the fewest null columns table_meta[table] = validate_compatible([meta, other]) metadata_merger = metadata_merger or combine_metadata new_dataset_metadata = metadata_merger( [self.dataset_metadata, metapartition.dataset_metadata] ) new_object = MetaPartition( label="NestedMetaPartition", dataset_metadata=new_dataset_metadata, metadata_version=metapartition.metadata_version, table_meta=table_meta, partition_keys=metapartition.partition_keys or None, logical_conjunction=metapartition.logical_conjunction or None, ) # Add metapartition information to the new object new_metapartitions = self.metapartitions.copy() new_metapartitions.extend(metapartition.metapartitions.copy()) new_object.metapartitions = new_metapartitions return new_object
[docs] @staticmethod def from_dict(dct): """ Create a :class:`~kartothek.io_components.metapartition.MetaPartition` from a dictionary. Parameters ---------- dct : dict Dictionary containing constructor arguments as keys Returns ------- """ return MetaPartition( label=dct["label"], files=dct.get("files", {}), metadata=dct.get("metadata", {}), data=dct.get("data", {}), indices=dct.get("indices", {}), metadata_version=dct.get("metadata_version", None), dataset_metadata=dct.get("dataset_metadata", {}), table_meta=dct.get("table_meta", {}), partition_keys=dct.get("partition_keys", None), logical_conjunction=dct.get("logical_conjunction", None), )
[docs] def to_dict(self): return { "label": self.label, "files": self.files or {}, "data": self.data or {}, "indices": self.indices, "metadata_version": self.metadata_version, "dataset_metadata": self.dataset_metadata, "table_meta": self.table_meta, "partition_keys": self.partition_keys, "logical_conjunction": self.logical_conjunction, }
[docs] @_apply_to_list def remove_dataframes(self): """ Remove all dataframes from the metapartition in memory. """ return self.copy(data={})
def _split_predicates_in_index_and_content(self, predicates): """ Split a list of predicates in the parts that can be resolved by the partition columns and the ones that are persisted in the data file. """ # Predicates are split in this function into the parts that apply to # the partition key columns `key_part` and the parts that apply to the # contents of the file `content_part`. split_predicates = [] has_index_condition = False for conjunction in predicates: key_part = [] content_part = [] for literal in conjunction: if literal.column in self.partition_keys: has_index_condition = True key_part.append(literal) else: content_part.append(literal) split_predicates.append(_SplitPredicate(key_part, content_part)) return split_predicates, has_index_condition @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "table", ) def _apply_partition_key_predicates(self, table, indices, split_predicates): """ Apply the predicates to the partition_key columns and return the remaining predicates that should be pushed to the DataFrame serialiser. """ # Construct a single line DF with the partition columns schema = self.table_meta[table] index_df_dct = {} for column, value in indices: pa_dtype = schema[schema.get_field_index(column)].type value = IndexBase.normalize_value(pa_dtype, value) if pa.types.is_date(pa_dtype): index_df_dct[column] = pd.Series( pd.to_datetime([value], infer_datetime_format=True) ).dt.date else: dtype = pa_dtype.to_pandas_dtype() index_df_dct[column] = pd.Series([value], dtype=dtype) index_df = pd.DataFrame(index_df_dct) filtered_predicates = [] # We assume that indices on the partition level have been filtered out already in `dispatch_metapartitions`. # `filtered_predicates` should only contain predicates that can be evaluated on parquet level for conjunction in split_predicates: predicates = [conjunction.key_part] if ( len(conjunction.key_part) == 0 or len( filter_df_from_predicates( index_df, predicates, strict_date_types=True ) ) > 0 ): if len(conjunction.content_part) > 0: filtered_predicates.append(conjunction.content_part) else: # A condititon applies to the whole DataFrame, so we need to # load all data. return None return filtered_predicates
[docs] @default_docs @_apply_to_list @deprecate_parameters( get_parameter_default_value_deprecation_warning( from_value="False", to_value="True", deprecated_in="5.3", changed_in="6.0" ), "dates_as_object", ) @deprecate_parameters_if_set( get_parameter_type_change_deprecation_warning( from_type="StoreInput", to_type="KeyValueStore", deprecated_in="5.3", changed_in="6.0", ), "store", ) @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "tables", ) def load_dataframes( self, store: StoreInput, tables: _MULTI_TABLE_DICT_LIST = None, columns: _MULTI_TABLE_DICT_LIST = None, predicate_pushdown_to_io: bool = True, categoricals: _MULTI_TABLE_DICT_LIST = None, dates_as_object: bool = False, predicates: PredicatesType = None, ) -> "MetaPartition": """ Load the dataframes of the partitions from store into memory. Parameters ---------- tables If a list is supplied, only the given tables of the partition are loaded. If the given table does not exist it is ignored. Examples .. code:: >>> part = MetaPartition( ... label='part_label' ... files={ ... 'core': 'core_key_in_store', ... 'helper': 'helper_key_in_store' ... } ... ) >>> part.data {} >>> part = part.load_dataframes(store, ['core']) >>> part.data { 'core': pd.DataFrame() } """ if columns is None: columns = {} elif set(columns).difference(self.tables): raise ( ValueError( "You are trying to read columns from invalid table(s): {}".format( set(columns).difference(self.tables) ) ) ) if categoricals is None: categoricals = {} LOGGER.debug("Loading internal dataframes of %s", self.label) if len(self.files) == 0: # This used to raise, but the specs do not require this, so simply do a no op LOGGER.debug("Partition %s is empty and has not tables/files", self.label) return self new_data = copy(self.data) predicates = _combine_predicates(predicates, self.logical_conjunction) predicates = _predicates_to_named(predicates) for table, key in self.files.items(): table_columns = columns.get(table, None) categories = categoricals.get(table, None) dataset_uuid, _, indices, file_name = decode_key(key) if tables and table not in tables: continue # In case the columns only refer to the partition indices, we need to load at least a single column to # determine the length of the required dataframe. if table_columns is None: table_columns_to_io = None else: table_columns_to_io = table_columns filtered_predicates = predicates self._load_table_meta(dataset_uuid=dataset_uuid, table=table, store=store) # Filter predicates that would apply to this partition and remove the partition columns if predicates: # Check if there are predicates that match to the partition columns. # For these we need to check if the partition columns already falsify # the conditition. # # We separate these predicates into their index and their Parquet part. ( split_predicates, has_index_condition, ) = self._split_predicates_in_index_and_content(predicates) filtered_predicates = [] if has_index_condition: filtered_predicates = self._apply_partition_key_predicates( table, indices, split_predicates ) else: filtered_predicates = [ pred.content_part for pred in split_predicates ] # Remove partition_keys from table_columns_to_io if self.partition_keys and table_columns_to_io is not None: keys_to_remove = set(self.partition_keys) & set(table_columns_to_io) # This is done to not change the ordering of the list table_columns_to_io = [ c for c in table_columns_to_io if c not in keys_to_remove ] start = time.time() df = DataFrameSerializer.restore_dataframe( key=key, store=store, columns=table_columns_to_io, categories=categories, predicate_pushdown_to_io=predicate_pushdown_to_io, predicates=filtered_predicates, date_as_object=dates_as_object, ) LOGGER.debug("Loaded dataframe %s in %s seconds.", key, time.time() - start) # Metadata version >=4 parse the index columns and add them back to the dataframe df = self._reconstruct_index_columns( df=df, key_indices=indices, table=table, columns=table_columns, categories=categories, date_as_object=dates_as_object, ) df.columns = df.columns.map(ensure_string_type) if table_columns is not None: # TODO: When the write-path ensures that all partitions have the same column set, this check can be # moved before `DataFrameSerializer.restore_dataframe`. At the position of the current check we # may want to double check the columns of the loaded DF and raise an exception indicating an # inconsistent dataset state instead. missing_cols = set(table_columns).difference(df.columns) if missing_cols: raise ValueError( "Columns cannot be found in stored dataframe: {}".format( ", ".join(sorted(missing_cols)) ) ) if list(df.columns) != table_columns: df = df.reindex(columns=table_columns, copy=False) new_data[table] = df return self.copy(data=new_data)
[docs] @_apply_to_list def load_all_table_meta( self, store: StoreInput, dataset_uuid: str ) -> "MetaPartition": """ Loads all table metadata in memory and stores it under the `tables` attribute """ for table in self.files: self._load_table_meta(dataset_uuid, table, store) return self
def _load_table_meta( self, dataset_uuid: str, table: str, store: StoreInput ) -> "MetaPartition": if table not in self.table_meta: _common_metadata = read_schema_metadata( dataset_uuid=dataset_uuid, store=store, table=table ) self.table_meta[table] = _common_metadata return self @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "table", ) @deprecate_parameters_if_set( get_deprecation_warning_remove_dict_multi_table( deprecated_in="5.3", changed_in="6.0" ), "categories", ) def _reconstruct_index_columns( self, df, key_indices, table, columns, categories, date_as_object ): if len(key_indices) == 0: return df original_columns = list(df.columns) zeros = np.zeros(len(df), dtype=int) schema = self.table_meta[table] # One of the few places `inplace=True` makes a signifcant difference df.reset_index(drop=True, inplace=True) index_names = [primary_key for primary_key, _ in key_indices] # The index might already be part of the dataframe which is recovered from the parquet file. # In this case, still use the reconstructed index col to have consistent index columns behavior. # In this case the column in part of `original_columns` and must be removed to avoid duplication # in the column axis cleaned_original_columns = [ orig for orig in original_columns if orig not in index_names ] if cleaned_original_columns != original_columns: # indexer call is slow, so only do that if really necessary df = df.reindex(columns=cleaned_original_columns, copy=False) for pos, (primary_key, value) in enumerate(key_indices): # If there are predicates, don't reconstruct the index if it wasn't requested if columns is not None and primary_key not in columns: continue pa_dtype = schema.field(primary_key).type dtype = pa_dtype.to_pandas_dtype() convert_to_date = False if date_as_object and pa_dtype in [pa.date32(), pa.date64()]: convert_to_date = True if isinstance(dtype, type): value = dtype(value) elif isinstance(dtype, np.dtype): if dtype == np.dtype("datetime64[ns]"): value = pd.Timestamp(value) else: value = dtype.type(value) else: raise RuntimeError( "Unexepected object encountered: ({}, {})".format( dtype, type(dtype) ) ) if categories and primary_key in categories: if convert_to_date: cats = pd.Series(value).dt.date else: cats = [value] value = pd.Categorical.from_codes(zeros, categories=cats) else: if convert_to_date: value = pd.Timestamp(value).to_pydatetime().date() df.insert(pos, primary_key, value) return df
[docs] @_apply_to_list def merge_dataframes( self, left: str, right: str, output_label: str, merge_func: Callable = pd.merge, merge_kwargs: Optional[Dict] = None, ): """ Merge internal dataframes. The two referenced dataframes are removed from the internal list and the newly created dataframe is added. The merge itself can be completely customized by supplying a callable `merge_func(left_df, right_df, **merge_kwargs)` which can handle data pre-processing as well as the merge itself. The files attribute of the result will be empty since the in-memory DataFrames are no longer representations of the referenced files. Parameters ---------- left Category of the left dataframe. right Category of the right dataframe. output_label Category for the newly created dataframe merge_func The function to take care of the merge. By default: pandas.merge. The function should have the signature `func(left_df, right_df, **kwargs)` merge_kwargs Keyword arguments which should be supplied to the merge function Returns ------- MetaPartition """ warnings.warn( message=get_specific_function_deprecation_warning_multi_table( function_name="merge_dataframes", deprecated_in="5.3", removed_in="6.0" ), category=DeprecationWarning, ) # Shallow copy new_data = copy(self.data) if merge_kwargs is None: merge_kwargs = {} left_df = new_data.pop(left) right_df = new_data.pop(right) LOGGER.debug("Merging internal dataframes of %s", self.label) try: df_merged = merge_func(left_df, right_df, **merge_kwargs) except TypeError: LOGGER.error( "Tried to merge using %s with\n left:%s\nright:%s\n " "kwargs:%s", merge_func.__name__, left_df.head(), right_df.head(), merge_kwargs, ) raise new_data[output_label] = df_merged new_table_meta = copy(self.table_meta) # The tables are no longer part of the MetaPartition, thus also drop # their schema. del new_table_meta[left] del new_table_meta[right] new_table_meta[output_label] = make_meta( df_merged, origin="{}/{}".format(output_label, self.label), partition_keys=self.partition_keys, ) return self.copy(files={}, data=new_data, table_meta=new_table_meta)
[docs] @_apply_to_list def validate_schema_compatible( self, store: StoreInput, dataset_uuid: str ) -> "MetaPartition": """ Validates that the currently held DataFrames match the schema of the existing dataset. Parameters ---------- store If it is a function, the result of calling it must be a KeyValueStore. dataset_uuid The dataset UUID the partition will be assigned to """ # Load the reference meta of the existing dataset. Using the built-in # `load_all_table_meta` would not be helpful here as it would be a no-op # as we have already loaded the meta from the input DataFrame. reference_meta = {} for table in self.table_meta: _common_metadata = read_schema_metadata( dataset_uuid=dataset_uuid, store=store, table=table ) reference_meta[table] = _common_metadata result = {} for table, schema in self.table_meta.items(): try: result[table] = validate_compatible([schema, reference_meta[table]]) except ValueError as e: raise ValueError( "Schemas for table '{table}' of dataset '{dataset_uuid}' are not compatible!\n\n{e}".format( table=table, dataset_uuid=dataset_uuid, e=e ) ) validate_shared_columns(list(result.values())) return self
[docs] @_apply_to_list @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "store_metadata", "metadata_storage_format", ) def store_dataframes( self, store: StoreInput, dataset_uuid: str, df_serializer: Optional[DataFrameSerializer] = None, store_metadata: bool = False, metadata_storage_format: Optional[str] = None, ) -> "MetaPartition": """ Stores all dataframes of the MetaPartitions and registers the saved files under the `files` atrribute. The dataframe itself is deleted from memory. Parameters ---------- store If it is a function, the result of calling it must be a KeyValueStore. dataset_uuid The dataset UUID the partition will be assigned to df_serializer Serialiser to be used to store the dataframe Returns ------- MetaPartition """ df_serializer = ( df_serializer if df_serializer is not None else default_serializer() ) file_dct = {} for table, df in self.data.items(): key = get_partition_file_prefix( partition_label=self.label, dataset_uuid=dataset_uuid, table=table, metadata_version=self.metadata_version, ) LOGGER.debug("Store dataframe for table `%s` to %s ...", table, key) try: file_dct[table] = df_serializer.store(store, key, df) except Exception as exc: try: if isinstance(df, pd.DataFrame): buf = io.StringIO() df.info(buf=buf) LOGGER.error( "Writing dataframe failed.\n" "%s\n" "%s\n" "%s", exc, buf.getvalue(), df.head(), ) else: LOGGER.error("Storage of dask dataframe failed.") pass finally: raise LOGGER.debug("Storage of dataframe for table `%s` successful", table) new_metapartition = self.copy(files=file_dct, data={}) return new_metapartition
[docs] @_apply_to_list def concat_dataframes(self): """ Concatenates all dataframes with identical columns. In case of changes on the dataframes, the files attribute will be emptied since the in-memory DataFrames are no longer representations of the referenced files. Returns ------- MetaPartition A metapartition where common column dataframes are merged. The file attribute will be empty since there is no direct relation between the referenced files and the in-memory dataframes anymore """ count_cols = defaultdict(list) for label, df in self.data.items(): # List itself is not hashable key = "".join(sorted(df.columns)) count_cols[key].append((label, df)) is_modified = False new_data = {} for _, tuple_list in count_cols.items(): if len(tuple_list) > 1: is_modified = True data = [x[1] for x in tuple_list] label = _unique_label([x[0] for x in tuple_list]) new_data[label] = pd.concat(data).reset_index(drop=True) else: label, df = tuple_list[0] new_data[label] = df new_table_meta = { label: make_meta( df, origin="{}/{}".format(self.label, label), partition_keys=self.partition_keys, ) for (label, df) in new_data.items() } if is_modified: return self.copy(files={}, data=new_data, table_meta=new_table_meta) else: return self
[docs] @_apply_to_list @deprecate_parameters_if_set( get_deprecation_warning_remove_dict_multi_table( deprecated_in="5.3", changed_in="6.0" ), "func", ) @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "tables", "metadata", ) def apply( self, func: Union[Callable, Dict[str, Callable]], tables: Optional[List[str]] = None, metadata: Optional[Dict] = None, type_safe: bool = False, ) -> "MetaPartition": """ Applies a given function to all dataframes of the MetaPartition. Parameters ---------- func A callable accepting and returning a :class:`pandas.DataFrame` tables Only apply and return the function on the given tables. Note: behavior will change in future versions! New behavior will be: Only apply the provided function to the given tables uuid The changed dataset is assigned a new UUID. type_safe If the transformation is type-safe, optimizations can be applied """ if tables is None: tables = self.data.keys() else: warnings.warn( "The behavior for passing ``table`` parameter to ``MetaPartition.apply`` will " "change in the next major version. The future behavior will be to return all " "data and only apply the function to the selected tables. All other tables " "will be left untouched.", FutureWarning, ) if callable(func): new_data = {k: func(v) for k, v in self.data.items() if k in tables} elif isinstance(func, dict): new_data = {k: func[k](v) for k, v in self.data.items() if k in tables} if type_safe: new_table_meta = self.table_meta else: new_table_meta = { table: make_meta( df, origin="{}/{}".format(self.label, table), partition_keys=self.partition_keys, ) for table, df in new_data.items() } return self.copy(data=new_data, table_meta=new_table_meta)
[docs] def as_sentinel(self): """ """ return MetaPartition( None, metadata_version=self.metadata_version, partition_keys=self.partition_keys, )
[docs] def copy(self, **kwargs): """ Creates a shallow copy where the kwargs overwrite existing attributes """ def _renormalize_meta(meta): if "partition_keys" in kwargs: pk = kwargs["partition_keys"] return { table: normalize_column_order(schema, pk) for table, schema in meta.items() } else: return meta metapartitions = kwargs.get("metapartitions", None) or [] metapartitions.extend(self.metapartitions) if len(metapartitions) > 1: first_mp = metapartitions.pop() mp_parent = MetaPartition( label=first_mp.get("label"), files=first_mp.get("files"), metadata=first_mp.get("metadata"), data=first_mp.get("data"), dataset_metadata=kwargs.get("dataset_metadata", self.dataset_metadata), indices=first_mp.get("indices"), metadata_version=self.metadata_version, table_meta=_renormalize_meta(kwargs.get("table_meta", self.table_meta)), partition_keys=kwargs.get("partition_keys", self.partition_keys), logical_conjunction=kwargs.get( "logical_conjunction", self.logical_conjunction ), ) for mp in metapartitions: mp_parent = mp_parent.add_metapartition( MetaPartition( label=mp.get("label"), files=mp.get("files"), metadata=mp.get("metadata"), data=mp.get("data"), dataset_metadata=mp.get( "dataset_metadata", self.dataset_metadata ), indices=mp.get("indices"), metadata_version=self.metadata_version, table_meta=_renormalize_meta( kwargs.get("table_meta", self.table_meta) ), partition_keys=kwargs.get( "partition_keys", self.partition_keys ), logical_conjunction=kwargs.get( "logical_conjunction", self.logical_conjunction ), ), schema_validation=False, ) return mp_parent else: mp = MetaPartition( label=kwargs.get("label", self.label), files=kwargs.get("files", self.files), data=kwargs.get("data", self.data), dataset_metadata=kwargs.get("dataset_metadata", self.dataset_metadata), indices=kwargs.get("indices", self.indices), metadata_version=kwargs.get("metadata_version", self.metadata_version), table_meta=_renormalize_meta(kwargs.get("table_meta", self.table_meta)), partition_keys=kwargs.get("partition_keys", self.partition_keys), logical_conjunction=kwargs.get( "logical_conjunction", self.logical_conjunction ), ) return mp
[docs] @_apply_to_list def build_indices(self, columns: Iterable[str]): """ This builds the indices for this metapartition for the given columns. The indices for the passed columns are rebuilt, so exisiting index entries in the metapartition are overwritten. :param columns: A list of columns from which the indices over all dataframes in the metapartition are overwritten :return: self """ if self.label is None: return self new_indices = {} for col in columns: possible_values: Set[str] = set() col_in_partition = False for df in self.data.values(): if col in df: possible_values = possible_values | set(df[col].dropna().unique()) col_in_partition = True if (self.label is not None) and (not col_in_partition): raise RuntimeError( "Column `{corrupt_col}` could not be found in the partition `{partition_label}` " "with tables `{tables}`. Please check for any typos and validate your dataset.".format( corrupt_col=col, partition_label=self.label, tables=sorted(self.data.keys()), ) ) # There is at least one table with this column (see check above), so we can get the dtype from there. Also, # shared dtypes are ensured to be compatible. dtype = list( meta.field(col).type for meta in self.table_meta.values() if col in meta.names )[0] new_index = ExplicitSecondaryIndex( column=col, index_dct={value: [self.label] for value in possible_values}, dtype=dtype, ) if (col in self.indices) and self.indices[col].loaded: new_indices[col] = self.indices[col].update(new_index) else: new_indices[col] = new_index return self.copy(indices=new_indices)
[docs] @_apply_to_list def partition_on(self, partition_on: Union[str, Sequence[str]]): """ Partition all dataframes assigned to this MetaPartition according the the given columns. If the MetaPartition object contains index information, the information is split in such a way that they reference the new partitions. In case a requested partition column is not existent in **all** tables, a KeyError is raised. All output partitions are re-assigned labels encoding the partitioned columns (urlencoded) Examples:: >>> import pandas as pd >>> from kartothek.io_components.metapartition import MetaPartition >>> mp = MetaPartition( ... label='partition_label', ... data={ ... "Table1": pd.DataFrame({ ... 'P': [1, 2, 1, 2], ... 'L': [1, 1, 2, 2] ... }) ... } ... ) >>> repartitioned_mp = mp.partition_on(['P', 'L']) >>> assert [mp["label"] for mp in repartitioned_mp.metapartitions] == [ ... "P=1/L=1/partition_label", ... "P=1/L=2/partition_label", ... "P=2/L=1/partition_label", ... "P=2/L=2/partition_label" ... ] Parameters ---------- partition_on """ if partition_on == self.partition_keys: return self for partition_column in partition_on: if partition_column in self.indices: raise ValueError( "Trying to `partition_on` on a column with an explicit index!" ) new_mp = self.as_sentinel().copy( partition_keys=partition_on, table_meta={ table: normalize_column_order(schema, partition_on) for table, schema in self.table_meta.items() }, ) if isinstance(partition_on, str): partition_on = [partition_on] partition_on = self._ensure_compatible_partitioning(partition_on) new_data = self._partition_data(partition_on) for label, data_dct in new_data.items(): tmp_mp = MetaPartition( label=label, files=self.files, data=data_dct, dataset_metadata=self.dataset_metadata, metadata_version=self.metadata_version, indices={}, table_meta={ table: normalize_column_order(schema, partition_on).with_origin( "{}/{}".format(table, label) ) for table, schema in self.table_meta.items() }, partition_keys=partition_on, ) new_mp = new_mp.add_metapartition(tmp_mp, schema_validation=False) if self.indices: new_mp = new_mp.build_indices(columns=self.indices.keys()) return new_mp
def _ensure_compatible_partitioning(self, partition_on): if ( not self.partition_keys or self.partition_keys and (len(partition_on) >= len(self.partition_keys)) and (self.partition_keys == partition_on[: len(self.partition_keys)]) ): return partition_on[len(self.partition_keys) :] else: raise ValueError( "Incompatible partitioning encountered. `partition_on` needs to include the already " "existing partition keys and must preserve their order.\n" "Current partition keys: `{}`\n" "Partition on called with: `{}`".format( self.partition_keys, partition_on ) ) def _partition_data(self, partition_on): existing_indices, base_label = decode_key("uuid/table/{}".format(self.label))[ 2: ] dct = dict() empty_tables = [] for table, df in self.data.items(): # Check that data sizes do not change. This might happen if the # groupby below drops data, e.g. nulls size_after = 0 size_before = len(df) # Implementation from pyarrow # See https://github.com/apache/arrow/blob/b33dfd9c6bd800308bb1619b237dbf24dea159be/python/pyarrow/parquet.py#L1030 # noqa: E501 # column sanity checks data_cols = set(df.columns).difference(partition_on) missing_po_cols = set(partition_on).difference(df.columns) if missing_po_cols: raise ValueError( "Partition column(s) missing: {}".format( ", ".join(sorted(missing_po_cols)) ) ) if len(data_cols) == 0: raise ValueError("No data left to save outside partition columns") # To be aligned with open source tooling we drop the index columns and recreate # them upon reading as it is done by fastparquet and pyarrow partition_keys = [df[col] for col in partition_on] # The handling of empty dfs is not part of the arrow implementation if df.empty: empty_tables.append((table, df)) data_df = df.drop(partition_on, axis="columns") for value, group in data_df.groupby(by=partition_keys, sort=False): partitioning_info = [] if pd.api.types.is_scalar(value): value = [value] if existing_indices: partitioning_info.extend(quote_indices(existing_indices)) partitioning_info.extend(quote_indices(zip(partition_on, value))) partitioning_info.append(base_label) new_label = "/".join(partitioning_info) if new_label not in dct: dct[new_label] = {} dct[new_label][table] = group size_after += len(group) if size_before != size_after: raise ValueError( f"Original dataframe size ({size_before} rows) does not " f"match new dataframe size ({size_after} rows) for table {table}. " f"Hint: you may see this if you are trying to use `partition_on` on a column with null values." ) for label, table_dct in dct.items(): for empty_table, df in empty_tables: if empty_table not in table_dct: table_dct[empty_table] = df.drop(labels=partition_on, axis=1) return dct
[docs] @staticmethod def merge_indices(metapartitions): list_of_indices = [] for mp in metapartitions: for sub_mp in mp: if sub_mp.indices: list_of_indices.append(sub_mp.indices) return merge_indices_algo(list_of_indices)
@staticmethod def _merge_labels(metapartitions, label_merger=None): # Use the shortest of available labels since this has to be the partition # label prefix new_label = None # FIXME: This is probably not compatible with >= v3 if label_merger is None: for mp in metapartitions: label = mp.label if new_label is None or len(label) < len(new_label): new_label = label continue else: new_label = label_merger([mp.label for mp in metapartitions]) return new_label @staticmethod def _merge_metadata(metapartitions, metadata_merger=None): if metadata_merger is None: metadata_merger = combine_metadata new_ds_meta = metadata_merger([mp.dataset_metadata for mp in metapartitions]) return new_ds_meta
[docs] @staticmethod def merge_metapartitions(metapartitions, label_merger=None, metadata_merger=None): LOGGER.debug("Merging metapartitions") data = defaultdict(list) new_metadata_version = -1 logical_conjunction = None for mp in metapartitions: new_metadata_version = max(new_metadata_version, mp.metadata_version) for label, df in mp.data.items(): data[label].append(df) if mp.logical_conjunction or logical_conjunction: if logical_conjunction != mp.logical_conjunction: raise TypeError( "Can only merge metapartitions belonging to the same logical partition." ) else: logical_conjunction = mp.logical_conjunction new_data = {} for label in data: if len(data[label]) == 1: new_data[label] = data[label][0] else: for ix, idf in enumerate(data[label]): new_label = "{}_{}".format(label, ix) new_data[new_label] = idf new_label = MetaPartition._merge_labels(metapartitions, label_merger) new_ds_meta = MetaPartition._merge_metadata(metapartitions, metadata_merger) new_mp = MetaPartition( label=new_label, data=new_data, dataset_metadata=new_ds_meta, metadata_version=new_metadata_version, logical_conjunction=logical_conjunction, ) return new_mp
[docs] @staticmethod def concat_metapartitions(metapartitions, label_merger=None, metadata_merger=None): LOGGER.debug("Concatenating metapartitions") data = defaultdict(list) schema = defaultdict(list) new_metadata_version = -1 for mp in metapartitions: new_metadata_version = max(new_metadata_version, mp.metadata_version) for table in mp.data: data[table].append(mp.data[table]) schema[table].append(mp.table_meta[table]) # Don't care about the partition_keys. If we try to merge # MetaPartitions without alignment the schemas won't match. partition_keys = mp.partition_keys new_data = {} new_schema = {} for table in data: if len(data[table]) == 1: new_data[table] = data[table][0] else: categoricals = [ col for col, dtype in data[table][0].items() if pd.api.types.is_categorical_dtype(dtype) ] data[table] = align_categories(data[table], categoricals) new_data[table] = pd.concat(data[table]) new_schema[table] = validate_compatible(schema[table]) new_label = MetaPartition._merge_labels(metapartitions, label_merger) new_ds_meta = MetaPartition._merge_metadata(metapartitions, metadata_merger) new_mp = MetaPartition( label=new_label, data=new_data, dataset_metadata=new_ds_meta, metadata_version=new_metadata_version, table_meta=new_schema, partition_keys=partition_keys, ) return new_mp
[docs] @_apply_to_list def delete_from_store( self, dataset_uuid: Any, store: StoreInput ) -> "MetaPartition": store = ensure_store(store) # Delete data first for file_key in self.files.values(): store.delete(file_key) return self.copy(files={}, data={}, metadata={})
[docs] def get_parquet_metadata(self, store: StoreInput, table_name: str) -> pd.DataFrame: """ Retrieve the parquet metadata for the MetaPartition. Especially relevant for calculating dataset statistics. Parameters ---------- store A factory function providing a KeyValueStore table_name Name of the kartothek table for which the statistics should be retrieved Returns ------- pd.DataFrame A DataFrame with relevant parquet metadata """ if not isinstance(table_name, str): raise TypeError("Expecting a string for parameter `table_name`.") store = ensure_store(store) data = {} if table_name in self.files: with store.open(self.files[table_name]) as fd: # type: ignore pq_metadata = pa.parquet.ParquetFile(fd).metadata data = { "partition_label": self.label, "serialized_size": pq_metadata.serialized_size, "number_rows_total": pq_metadata.num_rows, "number_row_groups": pq_metadata.num_row_groups, "row_group_id": [], "number_rows_per_row_group": [], "row_group_compressed_size": [], "row_group_uncompressed_size": [], } for rg_ix in range(pq_metadata.num_row_groups): rg = pq_metadata.row_group(rg_ix) data["row_group_id"].append(rg_ix) data["number_rows_per_row_group"].append(rg.num_rows) data["row_group_compressed_size"].append(rg.total_byte_size) data["row_group_uncompressed_size"].append( sum( rg.column(col_ix).total_uncompressed_size for col_ix in range(rg.num_columns) ) ) df = pd.DataFrame(data=data, columns=_METADATA_SCHEMA.keys()) df = df.astype(_METADATA_SCHEMA) return df
def _unique_label(label_list): label = os.path.commonprefix(label_list) if len(label) == 0: label = "_".join(label_list) while len(label) > 0 and not label[-1].isalnum(): label = label[:-1] return label
[docs]def partition_labels_from_mps(mps: List[MetaPartition]) -> List[str]: """ Get a list of partition labels, flattening any nested meta partitions in the input and ignoring sentinels. """ partition_labels = [] for mp in mps: if len(mp) > 1: for nested_mp in mp: if not nested_mp.is_sentinel: partition_labels.append(nested_mp.label) else: if not mp.is_sentinel: partition_labels.append(mp.label) return partition_labels
[docs]@deprecate_parameters_if_set( DEPRECATION_WARNING_REMOVE_PARAMETER, "expected_secondary_indices", ) def parse_input_to_metapartition( obj: MetaPartitionInput, metadata_version: Optional[int] = None, expected_secondary_indices: Optional[InferredIndices] = False, ) -> MetaPartition: """ Parses given user input and returns a MetaPartition The format specification supports multiple input modes as following: 1. Mode - Dictionary with partition information In this case, a dictionary is supplied where the keys describe the partition. * **label** - (optional) Unique partition label. If None is given, a UUID \ is generated using :func:`kartothek.core.uuid.gen_uuid`. * **data** - A dict or list of tuples. The keys represent the table name \ and the values are the actual payload data as a pandas.DataFrame. * **indices** - Deprecated, see the keyword argument `secondary_indices` to create indices. A dictionary to describe the dataset indices. All \ partition level indices are finally merged using \ :func:`kartothek.io_components.metapartition.MetaPartition.merge_indices` \ into a single dataset index Examples:: # A partition with explicit label, no metadata, one table and index information input_obj = { 'label': 'partition_label', 'data': [('table', pd.DataFrame([{'column_1':values_1, 'column_2':values_2}]))], 'indices': { "column_1": { value: ['partition_label'] } } } # If no label is given, a UUID will be generated using :func:`kartothek.core.uuid.gen_uuid` simple_input = { 'data': [('table', pd.DataFrame())], } 2. Mode - `pandas.DataFrame` If only a DataFrame is provided, a UUID is generated and the dataframe is stored for the table name `SINGLE_TABLE` 3. Mode - :class:`~kartothek.io_components.metapartition.MetaPartition` If a MetaPartition is passed directly, it is simply passed through. 4. Mode - List of tuples The first item represents the table name and the second is the actual payload data \ as a pandas.DataFrame. Example:: # A partition with no explicit label, no metadata and one table input_obj = [('table', pd.DataFrame())] Nested MetaPartitions: The input may also be provided as a list to ease batch processing. The returned MetaPartition will be nested and each list element represents a single physical partition. For details on nested MetaPartitions, see :class:`~kartothek.io_components.metapartition.MetaPartition` Parameters ---------- obj metadata_version The kartothek dataset specification version expected_secondary_indices Iterable of strings containing expected columns on which indices are created. An empty iterable indicates no indices are expected. The default is `False`, which, indicates no checking will be done (`None` behaves the same way). This is only used in mode "Dictionary with partition information". Raises ------ ValueError In case the given input is not understood Returns ------- MetaPartition """ if obj is None: obj = [] if isinstance(obj, list): if len(obj) == 0: return MetaPartition(label=None, metadata_version=metadata_version) first_element = obj[0] if isinstance(first_element, tuple): data = {"data": [df] for df in obj} return parse_input_to_metapartition( obj=data, metadata_version=metadata_version, expected_secondary_indices=expected_secondary_indices, ) mp = parse_input_to_metapartition( obj=first_element, metadata_version=metadata_version, expected_secondary_indices=expected_secondary_indices, ) for mp_in in obj[1:]: mp = mp.add_metapartition( parse_input_to_metapartition( obj=mp_in, metadata_version=metadata_version, expected_secondary_indices=expected_secondary_indices, ) ) elif isinstance(obj, dict): if not obj.get("data"): data = obj elif isinstance(obj["data"], list): data = dict(obj["data"]) else: data = obj["data"] indices = obj.get("indices", {}) indices = {k: v for k, v in indices.items() if v} _ensure_valid_indices( mp_indices=indices, secondary_indices=expected_secondary_indices, data=data ) mp = MetaPartition( # TODO: Deterministic hash for the input? label=obj.get("label", gen_uuid()), data=data, indices=indices, metadata_version=metadata_version, ) elif isinstance(obj, pd.DataFrame): mp = MetaPartition( label=gen_uuid(), data={SINGLE_TABLE: obj}, metadata_version=metadata_version, ) elif isinstance(obj, MetaPartition): return obj else: raise ValueError( f"Unexpected type during parsing encountered: ({type(obj)}, {obj})" ) return mp