Source code for kartothek.io_components.cube.remove
from functools import reduce
from kartothek.core.cube.conditions import Conjunction
from kartothek.core.cube.constants import KTK_CUBE_METADATA_VERSION
from kartothek.io_components.metapartition import MetaPartition
from kartothek.utils.converters import converter_str_set_optional
from kartothek.utils.ktk_adapters import get_partition_dataframe
__all__ = ("prepare_metapartitions_for_removal_action",)
[docs]def prepare_metapartitions_for_removal_action(
cube, store, conditions, ktk_cube_dataset_ids, existing_datasets
):
"""
Prepare MetaPartition to express removal of given data range from cube.
The MetaPartition must still be written using ``mp.store_dataframes(...)`` and added to the Dataset using a
kartothek update method.
Parameters
----------
cube: kartothek.core.cube.cube.Cube
Cube spec.
store: Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]]
Store.
conditions: Union[None, Condition, Iterable[Condition], Conjunction]
Conditions that should be applied, optional. Defaults to "entire cube".
ktk_cube_dataset_ids: Optional[Union[Iterable[str], str]]
Ktk_cube dataset IDs to apply the remove action to, optional. Default to "all".
existing_datasets: Dict[str, kartothek.core.dataset.DatasetMetadata]
Existing datasets.
Returns
-------
metapartitions: Dict[str, Tuple[kartothek.core.dataset.DatasetMetadata,
kartothek.io_components.metapartition.MetaPartition, List[Dict[str, Any]]]]
MetaPartitions that should be written and updatet to the kartothek datasets as well as the ``delete_scope`` for
kartothek.
"""
conditions = Conjunction(conditions)
conditions_split = conditions.split_by_column()
if set(conditions_split.keys()) - set(cube.partition_columns):
raise ValueError(
"Can only remove partitions with conditions concerning cubes physical partition columns."
)
ktk_cube_dataset_ids = converter_str_set_optional(ktk_cube_dataset_ids)
if ktk_cube_dataset_ids is not None:
unknown_dataset_ids = ktk_cube_dataset_ids - set(existing_datasets.keys())
if unknown_dataset_ids:
raise ValueError(
"Unknown ktk_cube_dataset_ids: {}".format(
", ".join(sorted(unknown_dataset_ids))
)
)
else:
ktk_cube_dataset_ids = set(existing_datasets.keys())
metapartitions = {}
for ktk_cube_dataset_id in ktk_cube_dataset_ids:
ds = existing_datasets[ktk_cube_dataset_id]
ds = ds.load_partition_indices()
mp = _prepare_mp_empty(ds)
if not ds.partition_keys:
# no partition keys --> delete all
delete_scope = [{}]
else:
df_partitions = get_partition_dataframe(dataset=ds, cube=cube)
df_partitions = df_partitions.drop_duplicates()
local_condition = reduce(
lambda a, b: a & b,
(
cond
for col, cond in conditions_split.items()
if col in df_partitions.columns
),
Conjunction([]),
)
df_partitions = local_condition.filter_df(df_partitions)
delete_scope = df_partitions.to_dict(orient="records")
metapartitions[ktk_cube_dataset_id] = (ds, mp, delete_scope)
return metapartitions
def _prepare_mp_empty(dataset):
"""
Generate empty partition w/o any data for given cube.
Parameters
----------
dataset: kartothek.core.dataset.DatasetMetadata
Dataset to build empty MetaPartition for.
Returns
-------
mp: kartothek.io_components.metapartition.MetaPartition
MetaPartition, must still be added to the Dataset using a kartothek update method.
"""
return MetaPartition(
label=None,
metadata_version=KTK_CUBE_METADATA_VERSION,
partition_keys=dataset.partition_keys,
)