Source code for kartothek.io_components.merge

import logging
from typing import Callable, Generator, List, Union

import deprecation

from kartothek.core.dataset import DatasetMetadata
from kartothek.core.typing import StoreInput
from kartothek.core.utils import ensure_store
from kartothek.io_components.metapartition import MetaPartition
from kartothek.utils.migration_helpers import get_generic_function_deprecation_waring

LOGGER = logging.getLogger(__name__)

try:
    from typing_extensions import Literal  # type: ignore
except ImportError:
    from typing import Literal  # type: ignore


[docs]@deprecation.deprecated( deprecated_in="5.3", removed_in="6.0", details=get_generic_function_deprecation_waring(function_name="align_datasets"), ) def align_datasets( left_dataset_uuid: str, right_dataset_uuid: str, store: StoreInput, match_how: Union[Literal["exact", "prefix", "all"], Callable] = "exact", ) -> Generator[List[MetaPartition], None, None]: """ Determine dataset partition alignment Parameters ---------- left_dataset_uuid right_dataset_uuid store match_how Yields ------ List """ store = ensure_store(store) left_dataset = DatasetMetadata.load_from_store(uuid=left_dataset_uuid, store=store) right_dataset = DatasetMetadata.load_from_store( uuid=right_dataset_uuid, store=store ) metadata_version = left_dataset.metadata_version # Loop over the dataset with fewer partitions, treating its keys as # partition label prefixes if ( callable(match_how) or match_how == "left" or ( match_how == "prefix" and len(list(left_dataset.partitions.keys())[0]) < len(list(right_dataset.partitions.keys())[0]) ) ): first_dataset = left_dataset second_dataset = right_dataset else: first_dataset = right_dataset second_dataset = left_dataset # The del statements are here to reduce confusion below del left_dataset del right_dataset # For every partition in the 'small' dataset, at least one partition match # needs to be found in the larger dataset. available_partitions = list(second_dataset.partitions.items()) partition_stack = available_partitions[:] # TODO: write a test which protects against the following scenario!! # Sort the partition labels by length of the labels, starting with the # labels which are the longest. This way we prevent label matching for # similar partitions, e.g. cluster_100 and cluster_1. This, of course, # works only as long as the internal loop removes elements which were # matched already (here improperly called stack) for l_1 in sorted(first_dataset.partitions, key=len, reverse=True): p_1 = first_dataset.partitions[l_1] res = [ MetaPartition.from_partition( partition=p_1, metadata_version=metadata_version ) ] for parts in available_partitions: l_2, p_2 = parts if callable(match_how) and not match_how(l_1, l_2): continue if match_how == "exact" and l_1 != l_2: continue elif match_how == "prefix" and not l_2.startswith(l_1): LOGGER.debug("rejecting (%s, %s)", l_1, l_2) continue LOGGER.debug( "Found alignment between partitions " "(%s, %s) and" "(%s, %s)", first_dataset.uuid, p_1.label, second_dataset.uuid, p_2.label, ) res.append( MetaPartition.from_partition( partition=p_2, metadata_version=metadata_version ) ) # In exact or prefix matching schemes, it is expected to only # find one partition alignment. in this case reduce the size of # the inner loop if match_how in ["exact", "prefix"]: partition_stack.remove((l_2, p_2)) # Need to copy, otherwise remove will alter the loop iterator available_partitions = partition_stack[:] if len(res) == 1: raise RuntimeError( "No matching partition for {} in dataset {} " "found".format(p_1, first_dataset) ) yield res