from collections import defaultdict
from functools import partial
from typing import Dict, Optional, Sequence, Union, cast
import pandas as pd
from kartothek.core import naming
from kartothek.core.common_metadata import (
read_schema_metadata,
store_schema_metadata,
validate_compatible,
validate_shared_columns,
)
from kartothek.core.dataset import DatasetMetadataBuilder
from kartothek.core.index import ExplicitSecondaryIndex, IndexBase, PartitionIndex
from kartothek.core.partition import Partition
from kartothek.core.typing import StoreFactory, StoreInput
from kartothek.core.utils import ensure_store
from kartothek.io_components.metapartition import (
SINGLE_TABLE,
MetaPartition,
MetaPartitionInput,
parse_input_to_metapartition,
partition_labels_from_mps,
)
from kartothek.io_components.utils import (
InferredIndices,
combine_metadata,
extract_duplicates,
sort_values_categorical,
)
from kartothek.serialization import DataFrameSerializer
from kartothek.utils.migration_helpers import (
DEPRECATION_WARNING_REMOVE_PARAMETER,
deprecate_parameters,
deprecate_parameters_if_set,
get_deprecation_warning_parameter_non_optional,
get_parameter_replaced_by_deprecation_warning,
)
SINGLE_CATEGORY = SINGLE_TABLE
[docs]@deprecate_parameters(
get_deprecation_warning_parameter_non_optional(
deprecated_in="5.3", changed_in="6.0"
),
"secondary_indices",
"sort_partitions_by",
"partition_on",
"dataset_table_name",
)
def write_partition(
partition_df: MetaPartitionInput,
secondary_indices: Optional[InferredIndices],
sort_partitions_by: Optional[Union[str, Sequence[str]]],
dataset_uuid: str,
partition_on: Optional[Union[str, Sequence[str]]],
store_factory: StoreFactory,
df_serializer: Optional[DataFrameSerializer],
metadata_version: int,
dataset_table_name: Optional[str] = None,
) -> MetaPartition:
"""
Write a dataframe to store, performing all necessary preprocessing tasks
like partitioning, bucketing (NotImplemented), indexing, etc. in the correct order.
"""
store = ensure_store(store_factory)
parse_input: MetaPartitionInput
if isinstance(partition_df, pd.DataFrame) and dataset_table_name:
parse_input = [{"data": {dataset_table_name: partition_df}}]
else:
parse_input = partition_df
# delete reference to enable release after partition_on; before index build
del partition_df
# I don't have access to the group values
mps = parse_input_to_metapartition(
parse_input,
metadata_version=metadata_version,
expected_secondary_indices=secondary_indices,
)
if sort_partitions_by:
mps = mps.apply(partial(sort_values_categorical, columns=sort_partitions_by))
if partition_on:
mps = mps.partition_on(partition_on)
if secondary_indices:
mps = mps.build_indices(secondary_indices)
return mps.store_dataframes(
store=store, dataset_uuid=dataset_uuid, df_serializer=df_serializer
)
[docs]def persist_indices(
store: StoreInput, dataset_uuid: str, indices: Dict[str, IndexBase]
) -> Dict[str, str]:
store = ensure_store(store)
output_filenames = {}
for column, index in indices.items():
# backwards compat
if isinstance(index, dict):
legacy_storage_key = "{dataset_uuid}.{column}{suffix}".format(
dataset_uuid=dataset_uuid,
column=column,
suffix=naming.EXTERNAL_INDEX_SUFFIX,
)
index = ExplicitSecondaryIndex(
column=column, index_dct=index, index_storage_key=legacy_storage_key
)
elif isinstance(index, PartitionIndex):
continue
index = cast(ExplicitSecondaryIndex, index)
output_filenames[column] = index.store(store=store, dataset_uuid=dataset_uuid)
return output_filenames
[docs]def store_dataset_from_partitions(
partition_list,
store: StoreInput,
dataset_uuid,
dataset_metadata=None,
metadata_merger=None,
update_dataset=None,
remove_partitions=None,
metadata_storage_format=naming.DEFAULT_METADATA_STORAGE_FORMAT,
):
store = ensure_store(store)
if update_dataset:
dataset_builder = DatasetMetadataBuilder.from_dataset(update_dataset)
metadata_version = dataset_builder.metadata_version
else:
mp = next(iter(partition_list), None)
if mp is None:
raise ValueError(
"Cannot store empty datasets, partition_list must not be empty if in store mode."
)
metadata_version = mp.metadata_version
dataset_builder = DatasetMetadataBuilder(
uuid=dataset_uuid,
metadata_version=metadata_version,
partition_keys=mp.partition_keys,
)
dataset_builder.explicit_partitions = True
dataset_builder.table_meta = persist_common_metadata(
partition_list, update_dataset, store, dataset_uuid
)
# We can only check for non unique partition labels here and if they occur we will
# fail hard. The resulting dataset may be corrupted or file may be left in the store
# without dataset metadata
partition_labels = partition_labels_from_mps(partition_list)
non_unique_labels = extract_duplicates(partition_labels)
if non_unique_labels:
raise ValueError(
"The labels {} are duplicated. Dataset metadata was not written.".format(
", ".join(non_unique_labels)
)
)
if remove_partitions is None:
remove_partitions = []
if metadata_merger is None:
metadata_merger = combine_metadata
dataset_builder = update_metadata(
dataset_builder, metadata_merger, partition_list, dataset_metadata
)
dataset_builder = update_partitions(
dataset_builder, partition_list, remove_partitions
)
dataset_builder = update_indices(
dataset_builder, store, partition_list, remove_partitions
)
if metadata_storage_format.lower() == "json":
store.put(*dataset_builder.to_json())
elif metadata_storage_format.lower() == "msgpack":
store.put(*dataset_builder.to_msgpack())
else:
raise ValueError(
"Unknown metadata storage format encountered: {}".format(
metadata_storage_format
)
)
dataset = dataset_builder.to_dataset()
return dataset
[docs]def update_partitions(dataset_builder, add_partitions, remove_partitions):
for mp in add_partitions:
for sub_mp_dct in mp.metapartitions:
# label is None in case of an empty partition
if sub_mp_dct["label"] is not None:
partition = Partition(
label=sub_mp_dct["label"], files=sub_mp_dct["files"]
)
dataset_builder.add_partition(sub_mp_dct["label"], partition)
for partition_name in remove_partitions:
del dataset_builder.partitions[partition_name]
return dataset_builder
[docs]def update_indices(dataset_builder, store, add_partitions, remove_partitions):
dataset_indices = dataset_builder.indices
partition_indices = MetaPartition.merge_indices(add_partitions)
if dataset_indices: # dataset already exists and will be updated
if remove_partitions:
for column, dataset_index in dataset_indices.items():
dataset_indices[column] = dataset_index.remove_partitions(
remove_partitions, inplace=True
)
for column, index in partition_indices.items():
dataset_indices[column] = dataset_indices[column].update(
index, inplace=True
)
else: # dataset index will be created first time from partitions
dataset_indices = partition_indices
# Store indices
index_filenames = persist_indices(
store=store, dataset_uuid=dataset_builder.uuid, indices=dataset_indices
)
for column, filename in index_filenames.items():
dataset_builder.add_external_index(column, filename)
return dataset_builder
[docs]def raise_if_dataset_exists(dataset_uuid, store):
try:
store_instance = ensure_store(store)
for form in ["msgpack", "json"]:
key = naming.metadata_key_from_uuid(uuid=dataset_uuid, format=form)
if key in store_instance:
raise RuntimeError(
"Dataset `%s` already exists and overwrite is not permitted!",
dataset_uuid,
)
except KeyError:
pass