"""
Tooling to quickly discover datasets in a given blob store.
"""
import logging
from typing import Dict, Iterable, Optional, Set, Tuple, Union
from kartothek.api.consistency import check_datasets
from kartothek.core.cube.constants import (
KTK_CUBE_METADATA_DIMENSION_COLUMNS,
KTK_CUBE_METADATA_KEY_IS_SEED,
KTK_CUBE_METADATA_PARTITION_COLUMNS,
KTK_CUBE_METADATA_SUPPRESS_INDEX_ON,
KTK_CUBE_UUID_SEPERATOR,
)
from kartothek.core.cube.cube import Cube
from kartothek.core.dataset import DatasetMetadata
from kartothek.core.naming import (
METADATA_BASE_SUFFIX,
METADATA_FORMAT_JSON,
METADATA_FORMAT_MSGPACK,
)
from kartothek.core.typing import StoreInput
from kartothek.core.utils import ensure_store
from kartothek.utils.converters import converter_str_set_optional
__all__ = (
"discover_cube",
"discover_datasets",
"discover_datasets_unchecked",
"discover_ktk_cube_dataset_ids",
)
_logger = logging.getLogger(__name__)
def _discover_dataset_meta_files(prefix: str, store: StoreInput) -> Set[str]:
"""
Get meta file names for all datasets.
Parameters
----------
prefix
the prefix.
store
KV store.
Returns
-------
names: Set[str]
The meta file names
"""
store = ensure_store(store)
names = {
name[: -len(METADATA_BASE_SUFFIX + suffix)]
for name in store.iter_prefixes(delimiter="/", prefix=prefix)
for suffix in [METADATA_FORMAT_JSON, METADATA_FORMAT_MSGPACK]
if name.endswith(METADATA_BASE_SUFFIX + suffix)
}
return names
[docs]def discover_ktk_cube_dataset_ids(uuid_prefix: str, store: StoreInput) -> Set[str]:
"""
Get ktk_cube dataset ids for all datasets.
Parameters
----------
uuid_prefix
Dataset UUID prefix.
store
KV store.
Returns
-------
names: Set[str]
The ktk_cube dataset ids
"""
prefix = uuid_prefix + KTK_CUBE_UUID_SEPERATOR
names = _discover_dataset_meta_files(prefix, store)
return set([name[len(prefix) :] for name in names])
[docs]def discover_datasets_unchecked(
uuid_prefix: str,
store: StoreInput,
filter_ktk_cube_dataset_ids: Optional[Union[str, Iterable[str]]] = None,
) -> Dict[str, DatasetMetadata]:
"""
Get all known datasets that may belong to a give cube w/o applying any checks.
.. warning::
The results are not checked for validity. Found datasets may be incompatible w/ the given cube. Use
:meth:`~kartothek.api.consistency.check_datasets` to check the results, or go for :func:`~kartothek.api.discover.discover_datasets` in the first place.
Parameters
----------
uuid_prefix
Dataset UUID prefix.
store
KV store.
filter_ktk_cube_dataset_ids
Optional selection of datasets to include.
Returns
-------
datasets: Dict[str, DatasetMetadata]
All discovered datasets. Empty Dict if no dataset is found
"""
store = ensure_store(store)
filter_ktk_cube_dataset_ids = converter_str_set_optional(
filter_ktk_cube_dataset_ids
)
prefix = uuid_prefix + KTK_CUBE_UUID_SEPERATOR
names = _discover_dataset_meta_files(prefix, store)
if filter_ktk_cube_dataset_ids is not None:
names = {
name for name in names if name[len(prefix) :] in filter_ktk_cube_dataset_ids
}
result = {}
# sorted iteration for determistic error messages in case DatasetMetadata.load_from_store fails
for name in sorted(names):
try:
result[name[len(prefix) :]] = DatasetMetadata.load_from_store(
uuid=name, store=store, load_schema=True, load_all_indices=False
)
except KeyError as e:
_logger.warning(
'Ignore dataset "{name}" due to KeyError: {e}'.format(name=name, e=e)
)
return result
[docs]def discover_datasets(
cube: Cube,
store: StoreInput,
filter_ktk_cube_dataset_ids: Optional[Union[str, Iterable[str]]] = None,
) -> Dict[str, DatasetMetadata]:
"""
Get all known datasets that belong to a give cube.
Parameters
----------
cube
Cube specification.
store
KV store.
filter_ktk_cube_dataset_ids
Optional selection of datasets to include.
Returns
-------
datasets: Dict[str, DatasetMetadata]
All discovered datasets.
Raises
------
ValueError
In case no valid cube could be discovered.
"""
filter_ktk_cube_dataset_ids = converter_str_set_optional(
filter_ktk_cube_dataset_ids
)
result = discover_datasets_unchecked(
cube.uuid_prefix, store, filter_ktk_cube_dataset_ids
)
if filter_ktk_cube_dataset_ids is not None:
if isinstance(filter_ktk_cube_dataset_ids, str):
filter_ktk_cube_dataset_ids = {filter_ktk_cube_dataset_ids}
else:
filter_ktk_cube_dataset_ids = set(filter_ktk_cube_dataset_ids)
missing = filter_ktk_cube_dataset_ids - set(result.keys())
if missing:
raise ValueError(
"Could not find the following requested datasets: {missing}".format(
missing=", ".join(sorted(missing))
)
)
check_datasets(result, cube)
return result
[docs]def discover_cube(
uuid_prefix: str,
store: StoreInput,
filter_ktk_cube_dataset_ids: Optional[Union[str, Iterable[str]]] = None,
) -> Tuple[Cube, Dict[str, DatasetMetadata]]:
"""
Recover cube information from store.
Parameters
----------
uuid_prefix
Dataset UUID prefix.
store
KV store.
filter_ktk_cube_dataset_ids
Optional selection of datasets to include.
Returns
-------
cube: Cube
Cube specification.
datasets: Dict[str, DatasetMetadata]
All discovered datasets.
"""
datasets = discover_datasets_unchecked(
uuid_prefix, store, filter_ktk_cube_dataset_ids
)
seed_candidates = {
ktk_cube_dataset_id
for ktk_cube_dataset_id, ds in datasets.items()
if ds.metadata.get(
KTK_CUBE_METADATA_KEY_IS_SEED, ds.metadata.get("klee_is_seed", False)
)
}
if len(seed_candidates) == 0:
raise ValueError(
'Could not find seed dataset for cube "{uuid_prefix}".'.format(
uuid_prefix=uuid_prefix
)
)
elif len(seed_candidates) > 1:
raise ValueError(
'Found multiple possible seed datasets for cube "{uuid_prefix}": {seed_candidates}'.format(
uuid_prefix=uuid_prefix,
seed_candidates=", ".join(sorted(seed_candidates)),
)
)
seed_dataset = list(seed_candidates)[0]
seed_ds = datasets[seed_dataset]
dimension_columns = seed_ds.metadata.get(
KTK_CUBE_METADATA_DIMENSION_COLUMNS,
seed_ds.metadata.get("klee_dimension_columns"),
)
if dimension_columns is None:
raise ValueError(
'Could not recover dimension columns from seed dataset ("{seed_dataset}") of cube "{uuid_prefix}".'.format(
seed_dataset=seed_dataset, uuid_prefix=uuid_prefix
)
)
# datasets written with new kartothek versions (after merge of PR#7747)
# always set KTK_CUBE_METADATA_PARTITION_COLUMNS and "klee_timestamp_column" in the metadata.
# Older versions of ktk_cube do not write these; instead, these columns are inferred from
# the actual partitioning: partition_columns are all but the last partition key
#
# TODO: once we're sure we have re-written all kartothek cubes, the code
# in the branch `if partition_columns is None` below can be removed.
#
# read the now unused timestamp column just to make sure we can still read older cubes.
#
# TODO: once all cubes are re-created and don't use timestamp column anymore, remove the timestamp column handling
# entirely
partition_columns = seed_ds.metadata.get(
KTK_CUBE_METADATA_PARTITION_COLUMNS,
seed_ds.metadata.get("klee_partition_columns"),
)
timestamp_column = seed_ds.metadata.get("klee_timestamp_column")
if partition_columns is None:
# infer the partition columns and timestamp column from the actual partitioning:
partition_keys = seed_ds.partition_keys
if len(partition_keys) == 0:
raise ValueError(
'Seed dataset ("{seed_dataset}") has no partition keys.'.format( # type: ignore # noqa
seed_dataset=seed_dataset, partition_keys=", ".join(partition_keys),
)
)
elif len(partition_keys) < 2:
raise ValueError(
(
'Seed dataset ("{seed_dataset}") has only a single partition key ({partition_key}) '
"but should have at least 2."
).format(seed_dataset=seed_dataset, partition_key=partition_keys[0])
)
partition_columns = partition_keys[:-1]
timestamp_column = partition_keys[-1]
index_columns = set()
for ds in datasets.values():
index_columns |= set(ds.indices.keys()) - (
set(dimension_columns) | set(partition_columns) | {timestamp_column}
)
# we only support the default timestamp column in the compat code
if (timestamp_column is not None) and (timestamp_column != "KLEE_TS"):
raise NotImplementedError(
f"Can only read old cubes if the timestamp column is 'KLEE_TS', but '{timestamp_column}' was detected."
)
cube = Cube(
uuid_prefix=uuid_prefix,
dimension_columns=dimension_columns,
partition_columns=partition_columns,
index_columns=index_columns,
seed_dataset=seed_dataset,
suppress_index_on=seed_ds.metadata.get(KTK_CUBE_METADATA_SUPPRESS_INDEX_ON),
)
datasets = check_datasets(datasets, cube)
return cube, datasets