Source code for kartothek.io_components.cube.copy

from __future__ import absolute_import

from copy import copy
from typing import Callable, Dict, Iterable, Optional, Union

from simplekv import KeyValueStore

from kartothek.api.discover import check_datasets, discover_datasets_unchecked
from kartothek.core.cube.cube import Cube
from kartothek.core.dataset import DatasetMetadata
from kartothek.utils.ktk_adapters import get_dataset_keys

__all__ = (
    "get_copy_keys",
    "get_datasets_to_copy",
)


[docs]def get_datasets_to_copy( cube: Cube, src_store: Union[Callable[[], KeyValueStore], KeyValueStore], tgt_store: Union[Callable[[], KeyValueStore], KeyValueStore], overwrite: bool, datasets: Optional[Union[Iterable[str], Dict[str, DatasetMetadata]]] = None, ) -> Dict[str, DatasetMetadata]: """ Determine all dataset names of a given cube that should be copied and apply addtional consistency checks. Copying only a specific set of datasets is possible by providing a list of dataset names via the parameter `datasets`. Parameters ---------- cube: Cube specification. src_store: Source KV store. tgt_store: Target KV store. overwrite: If possibly existing datasets in the target store should be overwritten. datasets: Datasets to copy, must all be part of the cube. May be either the result of :func:`~kartothek.api.discover.discover_datasets`, an iterable of Ktk_cube dataset ID or ``None`` (in which case entire cube will be copied). Returns ------- all_datasets: Dict[str, DatasetMetadata] All datasets that should be copied. """ if not isinstance(datasets, dict): new_datasets = discover_datasets_unchecked( uuid_prefix=cube.uuid_prefix, store=src_store, filter_ktk_cube_dataset_ids=datasets, ) else: new_datasets = datasets if datasets is None: if not new_datasets: raise RuntimeError("{} not found in source store".format(cube)) else: unknown_datasets = set(datasets) - set(new_datasets) if unknown_datasets: raise RuntimeError( "{cube}, datasets {datasets} do not exist in source store".format( cube=cube, datasets=unknown_datasets ) ) existing_datasets = discover_datasets_unchecked(cube.uuid_prefix, tgt_store) if not overwrite: for ktk_cube_dataset_id in sorted(new_datasets.keys()): if ktk_cube_dataset_id in existing_datasets: raise RuntimeError( 'Dataset "{uuid}" exists in target store but overwrite was set to False'.format( uuid=new_datasets[ktk_cube_dataset_id].uuid ) ) all_datasets = copy(existing_datasets) all_datasets.update(new_datasets) check_datasets(all_datasets, cube) return new_datasets
[docs]def get_copy_keys( cube: Cube, src_store: Union[Callable[[], KeyValueStore], KeyValueStore], tgt_store: Union[Callable[[], KeyValueStore], KeyValueStore], overwrite: bool, datasets: Optional[Union[Iterable[str], Dict[str, DatasetMetadata]]] = None, ): """ Get and check keys that should be copied from one store to another. Parameters ---------- cube: Cube specification. src_store: Source KV store. tgt_store: Target KV store. overwrite: If possibly existing datasets in the target store should be overwritten. datasets: Datasets to copy, must all be part of the cube. May be either the result of :func:`~kartothek.api.discover.discover_datasets`, an iterable of Ktk_cube dataset ID or ``None`` (in which case entire cube will be copied). Returns ------- keys: Set[str] Set of keys to copy. Raises ------ RuntimeError: In case the copy would not pass successfully or if there is no cube in ``src_store``. """ new_datasets = get_datasets_to_copy( cube=cube, src_store=src_store, tgt_store=tgt_store, overwrite=overwrite, datasets=datasets, ) keys = set() for ktk_cube_dataset_id in sorted(new_datasets.keys()): ds = new_datasets[ktk_cube_dataset_id] keys |= get_dataset_keys(ds) return keys