Source code for kartothek.core.dataset

import copy
import logging
import re
from collections import OrderedDict, defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, Union

import deprecation
import pandas as pd
import pyarrow as pa
import simplejson

import kartothek.core._time
from kartothek.core import naming
from kartothek.core._compat import load_json
from kartothek.core._mixins import CopyMixin
from kartothek.core._zmsgpack import packb, unpackb
from kartothek.core.common_metadata import SchemaWrapper, read_schema_metadata
from kartothek.core.docs import default_docs
from kartothek.core.index import (
    ExplicitSecondaryIndex,
    IndexBase,
    PartitionIndex,
    filter_indices,
)
from kartothek.core.naming import EXTERNAL_INDEX_SUFFIX, PARQUET_FILE_SUFFIX
from kartothek.core.partition import Partition
from kartothek.core.typing import StoreInput
from kartothek.core.urlencode import decode_key, quote_indices
from kartothek.core.utils import ensure_store, verify_metadata_version
from kartothek.serialization import PredicatesType, columns_in_predicates
from kartothek.utils.migration_helpers import (
    DEPRECATION_WARNING_REMOVE_PARAMETER,
    deprecate_parameters_if_set,
    get_deprecation_warning_remove_parameter_multi_table,
    get_generic_function_deprecation_waring,
)

_logger = logging.getLogger(__name__)

TableMetaType = Dict[str, SchemaWrapper]

__all__ = ("DatasetMetadata", "DatasetMetadataBase")


def _validate_uuid(uuid: str) -> bool:
    return re.match(r"[a-zA-Z0-9+\-_]+$", uuid) is not None


def to_ordinary_dict(dct: Dict) -> Dict:
    new_dct = {}
    for key, value in dct.items():
        if isinstance(value, dict):
            new_dct[key] = to_ordinary_dict(value)
        else:
            new_dct[key] = value
    return new_dct


T = TypeVar("T", bound="DatasetMetadataBase")


[docs]class DatasetMetadataBase(CopyMixin): @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "table_meta", ) def __init__( self, uuid: str, partitions: Optional[Dict[str, Partition]] = None, metadata: Optional[Dict] = None, indices: Optional[Dict[str, IndexBase]] = None, metadata_version: int = naming.DEFAULT_METADATA_VERSION, explicit_partitions: bool = True, partition_keys: Optional[List[str]] = None, table_meta: Optional[Dict[str, SchemaWrapper]] = None, ): if not _validate_uuid(uuid): raise ValueError("UUID contains illegal character") self.metadata_version = metadata_version self.uuid = uuid self.partitions = partitions if partitions else {} self.metadata = metadata if metadata else {} self.indices = indices if indices else {} # explicit partitions means that the partitions are defined in the # metadata.json file (in contrast to implicit partitions that are # derived from the partition key names) self.explicit_partitions = explicit_partitions self.partition_keys = partition_keys or [] self._table_meta = table_meta if table_meta else {} _add_creation_time(self) super(DatasetMetadataBase, self).__init__() def __eq__(self, other: Any) -> bool: # Enforce dict comparison at the places where we only # care about content, not order. if self.uuid != other.uuid: return False if to_ordinary_dict(self.partitions) != to_ordinary_dict(other.partitions): return False if to_ordinary_dict(self.metadata) != to_ordinary_dict(other.metadata): return False if self.indices != other.indices: return False if self.explicit_partitions != other.explicit_partitions: return False if self.partition_keys != other.partition_keys: return False if self.table_meta != other.table_meta: return False return True @property def table_meta(self) -> Dict[str, SchemaWrapper]: return self._table_meta @table_meta.setter def table_meta(self, value): self._table_meta = value @property def schema(self) -> SchemaWrapper: if len(self.tables) > 1: raise AttributeError( "Attribute schema can only be accessed for a single tabled dataset" ) return self._table_meta[self.tables[0]] @property def primary_indices_loaded(self) -> bool: if not self.partition_keys: return False for pkey in self.partition_keys: if pkey not in self.indices: return False return True @property def tables(self) -> List[str]: if self.table_meta: return list(self.table_meta.keys()) elif self.partitions: return [tab for tab in list(self.partitions.values())[0].files] else: return [] @property def index_columns(self) -> Set[str]: return set(self.indices.keys()).union(self.partition_keys) @property def secondary_indices(self) -> Dict[str, ExplicitSecondaryIndex]: return { col: ind for col, ind in self.indices.items() if isinstance(ind, ExplicitSecondaryIndex) }
[docs] @staticmethod def exists(uuid: str, store: StoreInput) -> bool: """ Check if a dataset exists in a storage Parameters ---------- uuid UUID of the dataset. store Object that implements the .get method for file/object loading. """ store = ensure_store(store) key = naming.metadata_key_from_uuid(uuid) if key in store: return True key = naming.metadata_key_from_uuid(uuid, format="msgpack") return key in store
[docs] @staticmethod def storage_keys(uuid: str, store: StoreInput) -> List[str]: """ Retrieve all keys that belong to the given dataset. Parameters ---------- uuid UUID of the dataset. store Object that implements the .iter_keys method for key retrieval loading. """ store = ensure_store(store) start_markers = ["{}.".format(uuid), "{}/".format(uuid)] return list( sorted( k for k in store.iter_keys(uuid) if any(k.startswith(marker) for marker in start_markers) ) )
[docs] def to_dict(self) -> Dict: dct = OrderedDict( [ (naming.METADATA_VERSION_KEY, self.metadata_version), (naming.UUID_KEY, self.uuid), ] ) if self.indices: dct["indices"] = { k: v.to_dict() if v.loaded else v.index_storage_key if isinstance(v, ExplicitSecondaryIndex) else {} for k, v in self.indices.items() } if self.metadata: dct["metadata"] = self.metadata if self.partitions or self.explicit_partitions: dct["partitions"] = { label: partition.to_dict() for label, partition in self.partitions.items() } if self.partition_keys is not None: dct["partition_keys"] = self.partition_keys # don't preserve table_meta, since there is no JSON-compatible way (yet) return dct
[docs] def to_json(self) -> bytes: return simplejson.dumps(self.to_dict()).encode("utf-8")
[docs] def to_msgpack(self) -> bytes: return packb(self.to_dict())
[docs] def load_index(self: T, column: str, store: StoreInput) -> T: """ Load an index into memory. Note: External indices need to be preloaded before they can be queried. Parameters ---------- column Name of the column for which the index should be loaded. store Object that implements the .get method for file/object loading. Returns ------- dataset_metadata: :class:`~kartothek.core.dataset.DatasetMetadata` Mutated metadata object with the loaded index. """ if self.partition_keys and column in self.partition_keys: return self.load_partition_indices() if column not in self.indices: raise KeyError("No index specified for column '{}'".format(column)) index = self.indices[column] if index.loaded or not isinstance(index, ExplicitSecondaryIndex): return self loaded_index = index.load(store=store) if not self.explicit_partitions: col_loaded_index = filter_indices( {column: loaded_index}, self.partitions.keys() ) else: col_loaded_index = {column: loaded_index} indices = dict(self.indices, **col_loaded_index) return self.copy(indices=indices)
[docs] @deprecate_parameters_if_set( DEPRECATION_WARNING_REMOVE_PARAMETER, "load_partition_indices", ) def load_all_indices( self: T, store: StoreInput, load_partition_indices: bool = True ) -> T: """ Load all registered indices into memory. Note: External indices need to be preloaded before they can be queried. Parameters ---------- store Object that implements the .get method for file/object loading. load_partition_indices Flag if filename indices should be loaded. Default is True. Returns ------- dataset_metadata: :class:`~kartothek.core.dataset.DatasetMetadata` Mutated metadata object with the loaded indices. """ indices = { column: index.load(store) if isinstance(index, ExplicitSecondaryIndex) else index for column, index in self.indices.items() } ds = self.copy(indices=indices) if load_partition_indices: ds = ds.load_partition_indices() return ds
[docs] def query(self, indices: List[IndexBase] = None, **kwargs) -> List[str]: """ Query the dataset for partitions that contain specific values. Lookup is performed using the embedded and loaded external indices. Additional indices need to operate on the same partitions that the dataset contains, otherwise an empty list will be returned (the query method only restricts the set of partition keys using the indices). Parameters ---------- indices: List of optional additional indices. **kwargs: Map of columns and values. Returns ------- List[str] List of keys of partitions that contain the queries values in the respective columns. """ candidate_set = set(self.partitions.keys()) additional_indices = indices if indices else {} combined_indices = dict( self.indices, **{index.column: index for index in additional_indices} ) for column, value in kwargs.items(): if column in combined_indices: candidate_set &= set(combined_indices[column].query(value)) return list(candidate_set)
[docs] @deprecation.deprecated( deprecated_in="5.3", removed_in="6.0", details=get_generic_function_deprecation_waring( function_name="load_partition_indices" ), ) def load_partition_indices(self: T) -> T: """ Load all filename encoded indices into RAM. File encoded indices can be extracted from datasets with partitions stored in a format like .. code:: `dataset_uuid/table/IndexCol=IndexValue/SecondIndexCol=Value/partition_label.parquet` Which results in an in-memory index holding the information .. code:: { "IndexCol": { IndexValue: ["partition_label"] }, "SecondIndexCol": { Value: ["partition_label"] } } """ if self.primary_indices_loaded: return self indices = _construct_dynamic_index_from_partitions( partitions=self.partitions, table_meta=self.table_meta, default_dtype=pa.string() if self.metadata_version == 3 else None, partition_keys=self.partition_keys, ) combined_indices = self.indices.copy() combined_indices.update(indices) return self.copy(indices=combined_indices)
[docs] @default_docs def get_indices_as_dataframe( self, columns: Optional[List[str]] = None, date_as_object: bool = True, predicates: PredicatesType = None, ): """ Converts the dataset indices to a pandas dataframe and filter relevant indices by `predicates`. For a dataset with indices on columns `column_a` and `column_b` and three partitions, the dataset output may look like .. code:: column_a column_b part_1 1 A part_2 2 B part_3 3 None Parameters ---------- """ if not self.primary_indices_loaded and columns != []: # self.load_partition_indices is not inplace dm = self.load_partition_indices() else: dm = self if columns is None: columns = sorted(dm.indices.keys()) if columns == []: return pd.DataFrame(index=dm.partitions) if predicates: predicate_columns = columns_in_predicates(predicates) columns_to_scan = sorted( (predicate_columns & self.indices.keys()) | set(columns) ) dfs = ( dm._evaluate_conjunction( columns=columns_to_scan, predicates=[conjunction], date_as_object=date_as_object, ) for conjunction in predicates ) df = pd.concat(dfs) index_name = df.index.name df = ( df.loc[:, columns].reset_index().drop_duplicates().set_index(index_name) ) else: df = dm._evaluate_conjunction( columns=columns, predicates=None, date_as_object=date_as_object, ) return df
def _evaluate_conjunction( self, columns: List[str], predicates: PredicatesType, date_as_object: bool ) -> pd.DataFrame: """ Evaluate all predicates related to `columns` to "AND". Parameters ---------- columns: A list of all columns, including query and index columns. predicates: Optional list of predicates, like [[('x', '>', 0), ...], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query. Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic. Available operators are: `==`, `!=`, `<=`, `>=`, `<`, `>` and `in`. dates_as_object: bool Load pyarrow.date{32,64} columns as ``object`` columns in Pandas instead of using ``np.datetime64`` to preserve their type. While this improves type-safety, this comes at a performance cost. Returns ------- pd.DataFrame: df_result A DataFrame containing all indices for which `predicates` holds true. """ non_index_columns = set(columns) - self.indices.keys() if non_index_columns: if non_index_columns & set(self.partition_keys): raise RuntimeError( "Partition indices not loaded. Please call `DatasetMetadata.load_partition_indices` first." ) raise ValueError( "Unknown index columns: {}".format(", ".join(sorted(non_index_columns))) ) dfs = [] for col in columns: df = pd.DataFrame( self.indices[col].as_flat_series( partitions_as_index=True, date_as_object=date_as_object, predicates=predicates, ) ) dfs.append(df) # dfs contains one df per index column. Each df stores indices filtered by `predicates` for each column. # Performing an inner join on these dfs yields the resulting "AND" evaluation for all of these predicates. # We start joining with the smallest dataframe, therefore the sorting. dfs_sorted = sorted(dfs, key=len) df_result = dfs_sorted.pop(0) for df in dfs_sorted: df_result = df_result.merge( df, left_index=True, right_index=True, copy=False ) return df_result
[docs]class DatasetMetadata(DatasetMetadataBase): """ Containing holding all metadata of the dataset. """ def __repr__(self): return ( "DatasetMetadata(uuid={uuid}, " "tables={tables}, " "partition_keys={partition_keys}, " "metadata_version={metadata_version}, " "indices={indices}, " "explicit_partitions={explicit_partitions})" ).format( uuid=self.uuid, tables=self.tables, partition_keys=self.partition_keys, metadata_version=self.metadata_version, indices=list(self.indices.keys()), explicit_partitions=self.explicit_partitions, )
[docs] @staticmethod def load_from_buffer( buf, store: StoreInput, format: str = "json" ) -> "DatasetMetadata": """ Load a dataset from a (string) buffer. Parameters ---------- buf: Input to be parsed. store: Object that implements the .get method for file/object loading. Returns ------- DatasetMetadata: Parsed metadata. """ if format == "json": metadata = load_json(buf) elif format == "msgpack": metadata = unpackb(buf) return DatasetMetadata.load_from_dict(metadata, store)
[docs] @staticmethod def load_from_store( uuid: str, store: StoreInput, load_schema: bool = True, load_all_indices: bool = False, ) -> "DatasetMetadata": """ Load a dataset from a storage Parameters ---------- uuid UUID of the dataset. store Object that implements the .get method for file/object loading. load_schema Load table schema load_all_indices Load all registered indices into memory. Returns ------- dataset_metadata: :class:`~kartothek.core.dataset.DatasetMetadata` Parsed metadata. """ key1 = naming.metadata_key_from_uuid(uuid) store = ensure_store(store) try: value = store.get(key1) metadata = load_json(value) except KeyError: key2 = naming.metadata_key_from_uuid(uuid, format="msgpack") try: value = store.get(key2) metadata = unpackb(value) except KeyError: raise KeyError( "Dataset does not exist. Tried {} and {}".format(key1, key2) ) ds = DatasetMetadata.load_from_dict(metadata, store, load_schema=load_schema) if load_all_indices: ds = ds.load_all_indices(store) return ds
[docs] @staticmethod def load_from_dict( dct: Dict, store: StoreInput, load_schema: bool = True ) -> "DatasetMetadata": """ Load dataset metadata from a dictionary and resolve any external includes. Parameters ---------- dct store Object that implements the .get method for file/object loading. load_schema Load table schema """ # Use copy here to get an OrderedDict metadata = copy.copy(dct) if "metadata" not in metadata: metadata["metadata"] = OrderedDict() metadata_version = dct[naming.METADATA_VERSION_KEY] dataset_uuid = dct[naming.UUID_KEY] explicit_partitions = "partitions" in metadata storage_keys = None if not explicit_partitions: storage_keys = DatasetMetadata.storage_keys(dataset_uuid, store) partitions = _load_partitions_from_filenames( store=store, storage_keys=storage_keys, metadata_version=metadata_version, ) metadata["partitions"] = partitions if metadata["partitions"]: tables = [tab for tab in list(metadata["partitions"].values())[0]["files"]] else: table_set = set() if storage_keys is None: storage_keys = DatasetMetadata.storage_keys(dataset_uuid, store) for key in storage_keys: if key.endswith(naming.TABLE_METADATA_FILE): table_set.add(key.split("/")[1]) tables = list(table_set) table_meta = {} if load_schema: for table in tables: table_meta[table] = read_schema_metadata( dataset_uuid=dataset_uuid, store=store, table=table ) metadata["table_meta"] = table_meta if "partition_keys" not in metadata: metadata["partition_keys"] = _get_partition_keys_from_partitions( metadata["partitions"] ) return DatasetMetadata.from_dict( metadata, explicit_partitions=explicit_partitions )
[docs] @staticmethod def from_buffer(buf: str, format: str = "json", explicit_partitions: bool = True): if format == "json": metadata = load_json(buf) else: metadata = unpackb(buf) return DatasetMetadata.from_dict( metadata, explicit_partitions=explicit_partitions )
[docs] @staticmethod def from_dict(dct: Dict, explicit_partitions: bool = True): """ Load dataset metadata from a dictionary. This must have no external references. Otherwise use ``load_from_dict`` to have them resolved automatically. """ # Use the builder class for reconstruction to have a single point for metadata version changes builder = DatasetMetadataBuilder( uuid=dct[naming.UUID_KEY], metadata_version=dct[naming.METADATA_VERSION_KEY], explicit_partitions=explicit_partitions, partition_keys=dct.get("partition_keys", None), table_meta=dct.get("table_meta", None), ) for key, value in dct.get("metadata", {}).items(): builder.add_metadata(key, value) for partition_label, part_dct in dct.get("partitions", {}).items(): builder.add_partition( partition_label, Partition.from_dict(partition_label, part_dct) ) for column, index_dct in dct.get("indices", {}).items(): if isinstance(index_dct, IndexBase): builder.add_embedded_index(column, index_dct) else: builder.add_embedded_index( column, ExplicitSecondaryIndex.from_v2(column, index_dct) ) return builder.to_dataset()
@deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "table_meta", ) def _get_type_from_meta( table_meta: Optional[Dict[str, SchemaWrapper]], column: str, default: Optional[pa.DataType], ) -> pa.DataType: # use first schema that provides type information, since write path should ensure that types are normalized and # equal if table_meta is not None: for schema in table_meta.values(): if column not in schema.names: continue idx = schema.get_field_index(column) return schema[idx].type if default is not None: return default raise ValueError( 'Cannot find type information for partition column "{}"'.format(column) ) @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "table_meta", ) def _empty_partition_indices( partition_keys: List[str], table_meta: TableMetaType, default_dtype: pa.DataType ): indices = {} for col in partition_keys: arrow_type = _get_type_from_meta(table_meta, col, default_dtype) indices[col] = PartitionIndex(column=col, index_dct={}, dtype=arrow_type) return indices @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "table_meta", ) def _construct_dynamic_index_from_partitions( partitions: Dict[str, Partition], table_meta: TableMetaType, default_dtype: pa.DataType, partition_keys: List[str], ) -> Dict[str, PartitionIndex]: if len(partitions) == 0: return _empty_partition_indices(partition_keys, table_meta, default_dtype) def _get_files(part): if isinstance(part, dict): return part["files"] else: return part.files # We exploit the fact that all tables are partitioned equally. first_partition = next( iter(partitions.values()) ) # partitions is NOT empty here, see check above first_partition_files = _get_files(first_partition) if not first_partition_files: return _empty_partition_indices(partition_keys, table_meta, default_dtype) key_table = next(iter(first_partition_files.keys())) storage_keys = ( (key, _get_files(part)[key_table]) for key, part in partitions.items() ) _key_indices: Dict[str, Dict[str, Set[str]]] = defaultdict(_get_empty_index) depth_indices = None for partition_label, key in storage_keys: _, _, indices, file_ = decode_key(key) if ( file_ is not None and key.endswith(PARQUET_FILE_SUFFIX) and not key.endswith(EXTERNAL_INDEX_SUFFIX) ): depth_indices = _check_index_depth(indices, depth_indices) for column, value in indices: _key_indices[column][value].add(partition_label) new_indices = {} for col, index_dct in _key_indices.items(): arrow_type = _get_type_from_meta(table_meta, col, default_dtype) # convert defaultdicts into dicts new_indices[col] = PartitionIndex( column=col, index_dct={k1: list(v1) for k1, v1 in index_dct.items()}, dtype=arrow_type, ) return new_indices def _get_partition_label(indices, filename, metadata_version): return "/".join( quote_indices(indices) + [filename.replace(PARQUET_FILE_SUFFIX, "")] ) def _check_index_depth(indices, depth_indices): if depth_indices is not None and len(indices) != depth_indices: raise RuntimeError( "Unknown file structure encountered. " "Depth of filename indices is not equal for all partitions." ) return len(indices) def _get_partition_keys_from_partitions(partitions): if len(partitions): part = next(iter(partitions.values())) files_dct = part["files"] if files_dct: key = next(iter(files_dct.values())) _, _, indices, _ = decode_key(key) if indices: return [tup[0] for tup in indices] return None def _load_partitions_from_filenames(store, storage_keys, metadata_version): partitions = defaultdict(_get_empty_partition) depth_indices = None for key in storage_keys: dataset_uuid, table, indices, file_ = decode_key(key) if file_ is not None and file_.endswith(PARQUET_FILE_SUFFIX): # valid key example: # <uuid>/<table>/<column_0>=<value_0>/.../<column_n>=<value_n>/part_label.parquet depth_indices = _check_index_depth(indices, depth_indices) partition_label = _get_partition_label(indices, file_, metadata_version) partitions[partition_label]["files"][table] = key return partitions def _get_empty_partition(): return {"files": {}, "metadata": {}} def _get_empty_index(): return defaultdict(set) def create_partition_key( dataset_uuid: str, table: str, index_values: List[Tuple[str, str]], filename: str = "data", ): """ Create partition key for a kartothek partition Parameters ---------- dataset_uuid table index_values filename Example: create_partition_key('my-uuid', 'testtable', [('index1', 'value1'), ('index2', 'value2')]) returns 'my-uuid/testtable/index1=value1/index2=value2/data' """ key_components = [dataset_uuid, table] index_path = quote_indices(index_values) key_components.extend(index_path) key_components.append(filename) key = "/".join(key_components) return key class DatasetMetadataBuilder(CopyMixin): """ Incrementally build up a dataset. In constrast to a :class:`kartothek.core.dataset.DatasetMetadata` instance, this object is mutable and may not be a full dataset (e.g. partitions don't need to be fully materialised). """ @deprecate_parameters_if_set( get_deprecation_warning_remove_parameter_multi_table( deprecated_in="5.3", removed_in="6.0" ), "table_meta", ) def __init__( self, uuid: str, metadata_version=naming.DEFAULT_METADATA_VERSION, explicit_partitions=True, partition_keys=None, table_meta=None, ): verify_metadata_version(metadata_version) self.uuid = uuid self.metadata: Dict = OrderedDict() self.indices: Dict[str, IndexBase] = OrderedDict() self.metadata_version = metadata_version self.partitions: Dict[str, Partition] = OrderedDict() self.partition_keys = partition_keys self.table_meta = table_meta self.explicit_partitions = explicit_partitions _add_creation_time(self) super(DatasetMetadataBuilder, self).__init__() @staticmethod def from_dataset(dataset): dataset = copy.deepcopy(dataset) ds_builder = DatasetMetadataBuilder( uuid=dataset.uuid, metadata_version=dataset.metadata_version, explicit_partitions=dataset.explicit_partitions, partition_keys=dataset.partition_keys, table_meta=dataset.table_meta, ) ds_builder.metadata = dataset.metadata ds_builder.indices = dataset.indices ds_builder.partitions = dataset.partitions ds_builder.tables = dataset.tables return ds_builder def modify_uuid(self, target_uuid: str): """ Modify the dataset uuid and depending metadata: - paths to partitioning files - path to index files Parameters ---------- target_uuid: str Modified dataset UUID. Returns ------- DatasetMetadataBuilder modified builder object """ # modify file names in partition metadata modified_partitions = {} for p_key, p in self.partitions.items(): pdict = p.to_dict() for table_key, table_file in pdict["files"].items(): if table_file.startswith(f"{self.uuid}/"): pdict["files"][table_key] = table_file.replace( self.uuid, target_uuid, 1 ) modified_partitions[p_key] = Partition.from_dict(p_key, pdict) self.partitions = modified_partitions for i_key, i in self.indices.items(): if ( isinstance(i, ExplicitSecondaryIndex) and i.index_storage_key is not None ): i.index_storage_key = i.index_storage_key.replace( self.uuid, target_uuid, 1 ) self.uuid = target_uuid return self def add_partition(self, name, partition): """ Add an (embedded) Partition. Parameters ---------- name: str Identifier of the partition. partition: :class:`kartothek.core.partition.Partition` The partition to add. """ self.partitions[name] = partition return self # TODO: maybe remove def add_embedded_index(self, column, index): """ Embed an index into the metadata. Parameters ---------- column: str Name of the indexed column index: kartothek.core.index.IndexBase The actual index object """ if column != index.column: # TODO Deprecate the column argument and take the column name directly from the index. raise RuntimeError( "The supplied index is not compatible with the supplied index." ) self.indices[column] = index def add_external_index(self, column, filename=None): """ Add a reference to an external index. Parameters ---------- column: str Name of the indexed column Returns ------- storage_key: str The location where the external index should be stored. """ if filename is None: filename = "{uuid}.{column_name}".format(uuid=self.uuid, column_name=column) filename += naming.EXTERNAL_INDEX_SUFFIX self.indices[column] = ExplicitSecondaryIndex( column, index_storage_key=filename ) return filename def add_metadata(self, key, value): """ Add arbitrary key->value metadata. Parameters ---------- key: str value: str """ self.metadata[key] = value def to_dict(self): """ Render the dataset to a dict. Returns ------- """ factory = type(self.metadata) dct = factory( [ (naming.METADATA_VERSION_KEY, self.metadata_version), (naming.UUID_KEY, self.uuid), ] ) if self.indices: dct["indices"] = {} for column, index in self.indices.items(): if isinstance(index, str): dct["indices"][column] = index elif index.loaded: dct["indices"][column] = index.to_dict() else: dct["indices"][column] = index.index_storage_key if self.metadata: dct["metadata"] = self.metadata if self.explicit_partitions: dct["partitions"] = factory() for label, partition in self.partitions.items(): part_dict = partition.to_dict() dct["partitions"][label] = part_dict if self.partition_keys is not None: dct["partition_keys"] = self.partition_keys # don't preserve table_meta, since there is no JSON-compatible way (yet) return dct def to_json(self): """ Render the dataset to JSON. Returns ------- storage_key: str The path where this metadata should be placed in the storage. dataset_json: str The rendered JSON for this dataset. """ return ( naming.metadata_key_from_uuid(self.uuid), simplejson.dumps(self.to_dict()).encode("utf-8"), ) def to_msgpack(self) -> Tuple[str, bytes]: """ Render the dataset to msgpack. Returns ------- storage_key: str The path where this metadata should be placed in the storage. dataset_json: str The rendered JSON for this dataset. """ return ( naming.metadata_key_from_uuid(self.uuid, format="msgpack"), packb(self.to_dict()), ) def to_dataset(self) -> DatasetMetadata: return DatasetMetadata( uuid=self.uuid, partitions=self.partitions, metadata=self.metadata, indices=self.indices, metadata_version=self.metadata_version, explicit_partitions=self.explicit_partitions, partition_keys=self.partition_keys, table_meta=self.table_meta, ) def _add_creation_time( dataset_object: Union[DatasetMetadataBase, DatasetMetadataBuilder] ): if "creation_time" not in dataset_object.metadata: creation_time = kartothek.core._time.datetime_utcnow().isoformat() dataset_object.metadata["creation_time"] = creation_time