Source code for kartothek.io_components.cube.query

"""
Common code to build query functions/pipelines.
"""
import itertools
from functools import reduce

import numpy as np

from kartothek.api.consistency import check_datasets
from kartothek.api.discover import discover_datasets
from kartothek.core.common_metadata import empty_dataframe_from_schema
from kartothek.core.cube.conditions import Conjunction
from kartothek.core.index import ExplicitSecondaryIndex
from kartothek.io_components.cube.query._group import (
    QueryGroup,
    load_group,
    quick_concat,
)
from kartothek.io_components.cube.query._intention import (
    QueryIntention,
    determine_intention,
)
from kartothek.io_components.cube.query._regroup import regroup
from kartothek.io_components.metapartition import SINGLE_TABLE
from kartothek.utils.ktk_adapters import get_dataset_columns

__all__ = ("QueryGroup", "QueryIntention", "load_group", "plan_query", "quick_concat")


def _get_indexed_columns(datasets):
    """
    Get columns that where indexed by Kartothek.

    Parameters
    ----------
    datasets: Dict[str, kartothek.core.dataset.DatasetMetadata]
        Available datasets.

    Returns
    -------
    indexed_columns: Dict[str, Set[str]]
        Indexed columns per ktk_cube dataset ID.
    """
    result = {}
    for ktk_cube_dataset_id, ds in datasets.items():
        result[ktk_cube_dataset_id] = set(ds.indices.keys())
    return result


def _load_required_explicit_indices(datasets, intention, store):
    """
    Load indices that are required for query evaluation.

    .. important::
        Primary/partition indices must already be loaded at this point!

    Parameters
    ----------
    datasets: Dict[str, kartothek.core.dataset.DatasetMetadata]
        Available datasets.
    intention: kartothek.io_components.cube.query._intention.QueryIntention
        Query intention.
    store: simplekv.KeyValueStore
        Store to query from.

    Returns
    -------
    datasets: Dict[str, kartothek.core.dataset.DatasetMetadata]
        Available datasets, w/ indices loaded.
    """
    # figure out which columns are required for query planning / regrouping
    requires_columns = reduce(
        set.union,
        (
            cond.columns
            for cond in itertools.chain(
                intention.conditions_pre.values(), intention.conditions_post.values()
            )
        ),
        set(),
    ) | set(intention.partition_by)

    # load all indices that describe these columns
    datasets_result = {}
    for ktk_cube_dataset_id, ds in datasets.items():
        indices = {
            column: index.load(store)
            if (
                isinstance(index, ExplicitSecondaryIndex)
                and (column in requires_columns)
            )
            else index
            for column, index in ds.indices.items()
        }
        ds = ds.copy(indices=indices)
        datasets_result[ktk_cube_dataset_id] = ds

    return datasets_result


def _determine_restrictive_dataset_ids(cube, datasets, intention):
    """
    Determine which datasets are restrictive.

    These are datasets which contain non-dimension columns and non-partition columns to which users wishes to apply
    restrictions (via conditions or via partition-by).

    Parameters
    ----------
    cube: Cube
        Cube specification.
    datasets: Dict[str, kartothek.core.dataset.DatasetMetadata]
        Available datasets.
    intention: kartothek.io_components.cube.query._intention.QueryIntention
        Query intention.

    Returns
    -------
    restrictive_dataset_ids: Set[str]
        Set of restrictive datasets (by Ktk_cube dataset ID).
    """
    result = set()
    for ktk_cube_dataset_id, dataset in datasets.items():
        if ktk_cube_dataset_id == cube.seed_dataset:
            continue

        mask = (
            set(intention.partition_by)
            | intention.conditions_pre.get(ktk_cube_dataset_id, Conjunction([])).columns
            | intention.conditions_post.get(
                ktk_cube_dataset_id, Conjunction([])
            ).columns
        ) - (set(cube.dimension_columns) | set(cube.partition_columns))
        overlap = mask & get_dataset_columns(dataset)
        if overlap:
            result.add(ktk_cube_dataset_id)

    return result


def _dermine_load_columns(cube, datasets, intention):
    """
    Determine which columns to load from given datasets.

    Parameters
    ----------
    cube: Cube
        Cube specification.
    datasets: Dict[str, kartothek.core.dataset.DatasetMetadata]
        Available datasets.
    intention: kartothek.io_components.cube.query._intention.QueryIntention
        Query intention.

    Returns
    -------
    load_columns: Dict[str, Set[str]]
        Columns to load.
    """
    result = {}
    for ktk_cube_dataset_id, ds in datasets.items():
        is_seed = ktk_cube_dataset_id == cube.seed_dataset
        ds_cols = get_dataset_columns(ds)
        dimensionality = ds_cols & set(cube.dimension_columns)
        is_projection = not dimensionality.issubset(set(intention.dimension_columns))

        mask = (
            set(intention.output_columns)
            | set(intention.dimension_columns)
            | intention.conditions_post.get(
                ktk_cube_dataset_id, Conjunction([])
            ).columns
        )
        if not is_seed:
            # optimize load routine by only restore partition columns for seed
            mask -= set(cube.partition_columns)

        candidates = ds_cols & mask
        payload = candidates - set(cube.partition_columns) - set(cube.dimension_columns)
        payload_requested = len(payload) > 0

        if is_seed or payload_requested:
            if is_projection and payload_requested:
                raise ValueError(
                    (
                        'Cannot project dataset "{ktk_cube_dataset_id}" with dimensionality [{dimensionality}] to '
                        "[{dimension_columns}] while keeping the following payload intact: {payload}"
                    ).format(
                        ktk_cube_dataset_id=ktk_cube_dataset_id,
                        dimensionality=", ".join(sorted(dimensionality)),
                        dimension_columns=", ".join(
                            sorted(intention.dimension_columns)
                        ),
                        payload=", ".join(sorted(payload)),
                    )
                )

            result[ktk_cube_dataset_id] = candidates
    return result


def _filter_relevant_datasets(datasets, load_columns):
    """
    Filter datasets so only ones that actually load columns are left.

    Parameters
    ----------
    datasets: Dict[str, kartothek.core.dataset.DatasetMetadata]
        Datasets to filter.
    load_columns: Dict[str, Set[str]]
        Columns to load.

    Returns
    -------
    datasets: Dict[str, kartothek.core.dataset.DatasetMetadata]
        Filtered datasets.
    """
    which = set(load_columns.keys())
    return {
        ktk_cube_dataset_id: ds
        for ktk_cube_dataset_id, ds in datasets.items()
        if ktk_cube_dataset_id in which
    }


def _reduce_empty_dtype_sizes(df):
    """
    Try to find smaller dtypes for empty DF.

    Currently, the following conversions are implemented:

    - all integers to ``int8``
    - all floats to ``float32``

    Parameters
    ----------
    df: pandas.DataFrame
        Empty DataFrame, will be modified.

    Returns
    -------
    df: pandas.DataFrame
        Empty DataFrame w/ smaller types.
    """

    def _reduce_dtype(dtype):
        if np.issubdtype(dtype, np.signedinteger):
            return np.int8
        elif np.issubdtype(dtype, np.unsignedinteger):
            return np.uint8
        elif np.issubdtype(dtype, np.floating):
            return np.float32
        else:
            return dtype

    return df.astype({col: _reduce_dtype(df[col].dtype) for col in df.columns})


[docs]def plan_query( conditions, cube, datasets, dimension_columns, partition_by, payload_columns, store, ): """ Plan cube query execution. .. important:: If the intention does not contain a partition-by, this partition by the cube partition columns to speed up the query on parallel backends. In that case, the backend must concat and check the resulting dataframes before passing it to the user. Parameters ---------- conditions: Union[None, Condition, Iterable[Condition], Conjunction] Conditions that should be applied. cube: Cube Cube specification. datasets: Union[None, Iterable[str], Dict[str, kartothek.core.dataset.DatasetMetadata]] Datasets to query, must all be part of the cube. dimension_columns: Optional[Iterable[str]] Dimension columns of the query, may result in projection. partition_by: Optional[Iterable[str]] By which column logical partitions should be formed. payload_columns: Optional[Iterable[str]] Which columns apart from ``dimension_columns`` and ``partition_by`` should be returned. store: Union[simplekv.KeyValueStore, Callable[[], simplekv.KeyValueStore]] Store to query from. Returns ------- intent: QueryIntention Query intention. empty_df: pandas.DataFrame Empty DataFrame representing the output types. groups: Tuple[QueryGroup] Tuple of query groups. May be empty. """ if callable(store): store = store() if not isinstance(datasets, dict): datasets = discover_datasets( cube=cube, store=store, filter_ktk_cube_dataset_ids=datasets ) else: datasets = check_datasets(datasets, cube) datasets = { ktk_cube_dataset_id: ds.load_partition_indices() for ktk_cube_dataset_id, ds in datasets.items() } indexed_columns = _get_indexed_columns(datasets) intention = determine_intention( cube=cube, datasets=datasets, dimension_columns=dimension_columns, partition_by=partition_by, conditions=conditions, payload_columns=payload_columns, indexed_columns=indexed_columns, ) datasets = _load_required_explicit_indices(datasets, intention, store) restrictive_dataset_ids = _determine_restrictive_dataset_ids( cube=cube, datasets=datasets, intention=intention ) load_columns = _dermine_load_columns( cube=cube, datasets=datasets, intention=intention ) datasets = _filter_relevant_datasets(datasets=datasets, load_columns=load_columns) empty_df = { ktk_cube_dataset_id: _reduce_empty_dtype_sizes( empty_dataframe_from_schema( schema=ds.table_meta[SINGLE_TABLE], columns=sorted( get_dataset_columns(ds) & set(load_columns[ktk_cube_dataset_id]) ), ) ) for ktk_cube_dataset_id, ds in datasets.items() } empty_df_single = empty_df[cube.seed_dataset].copy() for k, df in empty_df.items(): if k == cube.seed_dataset: continue if empty_df_single is None: empty_df_single = df.copy() else: empty_df_single = empty_df_single.merge(df) empty_df_single = empty_df_single[list(intention.output_columns)] groups = regroup( intention, cube=cube, datasets=datasets, empty_df=empty_df, indexed_columns=indexed_columns, load_columns=load_columns, restrictive_dataset_ids=restrictive_dataset_ids, ) return intention, empty_df_single, groups