Source code for kartothek.io_components.write

from collections import defaultdict
from functools import partial
from typing import Dict, Optional, Sequence, Union, cast

import pandas as pd

from kartothek.core import naming
from kartothek.core.common_metadata import (
    read_schema_metadata,
    store_schema_metadata,
    validate_compatible,
    validate_shared_columns,
)
from kartothek.core.dataset import DatasetMetadataBuilder
from kartothek.core.index import ExplicitSecondaryIndex, IndexBase, PartitionIndex
from kartothek.core.partition import Partition
from kartothek.core.typing import StoreFactory, StoreInput
from kartothek.core.utils import ensure_store
from kartothek.io_components.metapartition import (
    SINGLE_TABLE,
    MetaPartition,
    MetaPartitionInput,
    parse_input_to_metapartition,
    partition_labels_from_mps,
)
from kartothek.io_components.utils import (
    InferredIndices,
    combine_metadata,
    extract_duplicates,
    sort_values_categorical,
)
from kartothek.serialization import DataFrameSerializer
from kartothek.utils.migration_helpers import (
    DEPRECATION_WARNING_REMOVE_PARAMETER,
    deprecate_parameters,
    deprecate_parameters_if_set,
    get_deprecation_warning_parameter_non_optional,
    get_parameter_replaced_by_deprecation_warning,
)

SINGLE_CATEGORY = SINGLE_TABLE


[docs]@deprecate_parameters( get_deprecation_warning_parameter_non_optional( deprecated_in="5.3", changed_in="6.0" ), "secondary_indices", "sort_partitions_by", "partition_on", "dataset_table_name", ) def write_partition( partition_df: MetaPartitionInput, secondary_indices: Optional[InferredIndices], sort_partitions_by: Optional[Union[str, Sequence[str]]], dataset_uuid: str, partition_on: Optional[Union[str, Sequence[str]]], store_factory: StoreFactory, df_serializer: Optional[DataFrameSerializer], metadata_version: int, dataset_table_name: Optional[str] = None, ) -> MetaPartition: """ Write a dataframe to store, performing all necessary preprocessing tasks like partitioning, bucketing (NotImplemented), indexing, etc. in the correct order. """ store = ensure_store(store_factory) parse_input: MetaPartitionInput if isinstance(partition_df, pd.DataFrame) and dataset_table_name: parse_input = [{"data": {dataset_table_name: partition_df}}] else: parse_input = partition_df # delete reference to enable release after partition_on; before index build del partition_df # I don't have access to the group values mps = parse_input_to_metapartition( parse_input, metadata_version=metadata_version, expected_secondary_indices=secondary_indices, ) if sort_partitions_by: mps = mps.apply(partial(sort_values_categorical, columns=sort_partitions_by)) if partition_on: mps = mps.partition_on(partition_on) if secondary_indices: mps = mps.build_indices(secondary_indices) return mps.store_dataframes( store=store, dataset_uuid=dataset_uuid, df_serializer=df_serializer )
[docs]def persist_indices( store: StoreInput, dataset_uuid: str, indices: Dict[str, IndexBase] ) -> Dict[str, str]: store = ensure_store(store) output_filenames = {} for column, index in indices.items(): # backwards compat if isinstance(index, dict): legacy_storage_key = "{dataset_uuid}.{column}{suffix}".format( dataset_uuid=dataset_uuid, column=column, suffix=naming.EXTERNAL_INDEX_SUFFIX, ) index = ExplicitSecondaryIndex( column=column, index_dct=index, index_storage_key=legacy_storage_key ) elif isinstance(index, PartitionIndex): continue index = cast(ExplicitSecondaryIndex, index) output_filenames[column] = index.store(store=store, dataset_uuid=dataset_uuid) return output_filenames
[docs]@deprecate_parameters_if_set( get_parameter_replaced_by_deprecation_warning( replaced_by="schemas", deprecated_in="5.3", changed_in="6.0" ), "partition_list", ) def persist_common_metadata(partition_list, update_dataset, store, dataset_uuid): # hash the schemas for quick equality check with possible false negatives # (e.g. other pandas version or null schemas) tm_dct = defaultdict(set) for mp in partition_list: for tab, tm in mp.table_meta.items(): tm_dct[tab].add(tm) if update_dataset: if set(tm_dct.keys()) and set(update_dataset.tables) != set(tm_dct.keys()): raise ValueError( ( "Input partitions for update have different tables than dataset:\n" "Input partition tables: {}\n" "Tables of existing dataset: {}" ).format(set(tm_dct.keys()), update_dataset.tables) ) for table in update_dataset.tables: tm_dct[table].add( read_schema_metadata( dataset_uuid=dataset_uuid, store=store, table=table ) ) result = {} # sort tables and schemas to have reproducible error messages for table in sorted(tm_dct.keys()): schemas = sorted(tm_dct[table], key=lambda s: sorted(s.origin)) try: result[table] = validate_compatible(schemas) except ValueError as e: raise ValueError( "Schemas for table '{table}' of dataset '{dataset_uuid}' are not compatible!\n\n{e}".format( table=table, dataset_uuid=dataset_uuid, e=e ) ) validate_shared_columns(list(result.values())) for table, schema in result.items(): store_schema_metadata( schema=schema, dataset_uuid=dataset_uuid, store=store, table=table ) return result
[docs]def store_dataset_from_partitions( partition_list, store: StoreInput, dataset_uuid, dataset_metadata=None, metadata_merger=None, update_dataset=None, remove_partitions=None, metadata_storage_format=naming.DEFAULT_METADATA_STORAGE_FORMAT, ): store = ensure_store(store) if update_dataset: dataset_builder = DatasetMetadataBuilder.from_dataset(update_dataset) metadata_version = dataset_builder.metadata_version else: mp = next(iter(partition_list), None) if mp is None: raise ValueError( "Cannot store empty datasets, partition_list must not be empty if in store mode." ) metadata_version = mp.metadata_version dataset_builder = DatasetMetadataBuilder( uuid=dataset_uuid, metadata_version=metadata_version, partition_keys=mp.partition_keys, ) dataset_builder.explicit_partitions = True dataset_builder.table_meta = persist_common_metadata( partition_list, update_dataset, store, dataset_uuid ) # We can only check for non unique partition labels here and if they occur we will # fail hard. The resulting dataset may be corrupted or file may be left in the store # without dataset metadata partition_labels = partition_labels_from_mps(partition_list) non_unique_labels = extract_duplicates(partition_labels) if non_unique_labels: raise ValueError( "The labels {} are duplicated. Dataset metadata was not written.".format( ", ".join(non_unique_labels) ) ) if remove_partitions is None: remove_partitions = [] if metadata_merger is None: metadata_merger = combine_metadata dataset_builder = update_metadata( dataset_builder, metadata_merger, partition_list, dataset_metadata ) dataset_builder = update_partitions( dataset_builder, partition_list, remove_partitions ) dataset_builder = update_indices( dataset_builder, store, partition_list, remove_partitions ) if metadata_storage_format.lower() == "json": store.put(*dataset_builder.to_json()) elif metadata_storage_format.lower() == "msgpack": store.put(*dataset_builder.to_msgpack()) else: raise ValueError( "Unknown metadata storage format encountered: {}".format( metadata_storage_format ) ) dataset = dataset_builder.to_dataset() return dataset
[docs]@deprecate_parameters_if_set( DEPRECATION_WARNING_REMOVE_PARAMETER, "add_partitions", ) def update_metadata(dataset_builder, metadata_merger, add_partitions, dataset_metadata): metadata_list = [dataset_builder.metadata] metadata_list += [mp.dataset_metadata for mp in add_partitions] new_dataset_metadata = metadata_merger(metadata_list) dataset_metadata = dataset_metadata or {} if callable(dataset_metadata): dataset_metadata = dataset_metadata() new_dataset_metadata.update(dataset_metadata) for key, value in new_dataset_metadata.items(): dataset_builder.add_metadata(key, value) return dataset_builder
[docs]def update_partitions(dataset_builder, add_partitions, remove_partitions): for mp in add_partitions: for sub_mp_dct in mp.metapartitions: # label is None in case of an empty partition if sub_mp_dct["label"] is not None: partition = Partition( label=sub_mp_dct["label"], files=sub_mp_dct["files"] ) dataset_builder.add_partition(sub_mp_dct["label"], partition) for partition_name in remove_partitions: del dataset_builder.partitions[partition_name] return dataset_builder
[docs]def update_indices(dataset_builder, store, add_partitions, remove_partitions): dataset_indices = dataset_builder.indices partition_indices = MetaPartition.merge_indices(add_partitions) if dataset_indices: # dataset already exists and will be updated if remove_partitions: for column, dataset_index in dataset_indices.items(): dataset_indices[column] = dataset_index.remove_partitions( remove_partitions, inplace=True ) for column, index in partition_indices.items(): dataset_indices[column] = dataset_indices[column].update( index, inplace=True ) else: # dataset index will be created first time from partitions dataset_indices = partition_indices # Store indices index_filenames = persist_indices( store=store, dataset_uuid=dataset_builder.uuid, indices=dataset_indices ) for column, filename in index_filenames.items(): dataset_builder.add_external_index(column, filename) return dataset_builder
[docs]def raise_if_dataset_exists(dataset_uuid, store): try: store_instance = ensure_store(store) for form in ["msgpack", "json"]: key = naming.metadata_key_from_uuid(uuid=dataset_uuid, format=form) if key in store_instance: raise RuntimeError( "Dataset `%s` already exists and overwrite is not permitted!", dataset_uuid, ) except KeyError: pass