Source code for kartothek.io_components.cube.query._intention

"""
Methods to figure out the actual user intention during the query process.
"""
import itertools
import typing
from functools import reduce

import attr
import pandas as pd
import pyarrow as pa

from kartothek.core.cube.conditions import Conjunction
from kartothek.serialization._parquet import _normalize_value
from kartothek.utils.converters import converter_str_set, converter_str_tupleset
from kartothek.utils.ktk_adapters import get_dataset_columns, get_dataset_schema

__all__ = ("QueryIntention", "determine_intention")


def _process_dimension_columns(dimension_columns, cube):
    """
    Process and check given dimension columns.

    Parameters
    ----------
    dimension_columns: Optional[Iterable[str]]
        Dimension columns of the query, may result in projection.
    cube: Cube
        Cube specification.

    Returns
    -------
    dimension_columns: Tuple[str, ...]
        Real dimension columns.
    """
    if dimension_columns is None:
        return cube.dimension_columns
    else:
        dimension_columns = converter_str_tupleset(dimension_columns)
        missing = set(dimension_columns) - set(cube.dimension_columns)
        if missing:
            raise ValueError(
                "Following dimension columns were requested but are missing from the cube: {missing}".format(
                    missing=", ".join(sorted(missing))
                )
            )
        if len(dimension_columns) == 0:
            raise ValueError("Dimension columns cannot be empty.")
        return dimension_columns


def _process_partition_by(partition_by, cube, all_available_columns, indexed_columns):
    """
    Process and check given partition-by columns.

    Parameters
    ----------
    partition_by: Optional[Iterable[str]]
        By which column logical partitions should be formed.
    cube: Cube
        Cube specification.
    all_available_columns: Set[str]
        All columns that are available for query.
    indexed_columns: Dict[str, Set[str]]
        Indexed columns per ktk_cube dataset ID.

    Returns
    -------
    partition_by: Tuple[str, ...]
        Real partition-by columns, may be empty.
    """
    if partition_by is None:
        return []
    else:
        partition_by = converter_str_tupleset(partition_by)
        partition_by_set = set(partition_by)

        missing_available = partition_by_set - all_available_columns
        if missing_available:
            raise ValueError(
                "Following partition-by columns were requested but are missing from the cube: {missing}".format(
                    missing=", ".join(sorted(missing_available))
                )
            )

        missing_indexed = partition_by_set - reduce(
            set.union, indexed_columns.values(), set()
        )
        if missing_indexed:
            raise ValueError(
                "Following partition-by columns are not indexed and cannot be used: {missing}".format(
                    missing=", ".join(sorted(missing_indexed))
                )
            )

        return partition_by


def _test_condition_types(conditions, datasets):
    """
    Process and check given query conditions.

    Parameters
    ----------
    conditions: Conjunction
        Conditions that should be applied.
    datasets: Dict[str, kartothek.core.dataset.DatasetMetadata]
        Datasets that are present.

    Raises
    -------
    TypeError: In case of a wrong type.
    """
    for single_condition in conditions.conditions:
        test_predicate = single_condition.predicate_part
        for literal in test_predicate:
            col, op, val = literal
            if op != "in":
                val = [val]

            for ktk_cube_dataset_id in sorted(datasets.keys()):
                dataset = datasets[ktk_cube_dataset_id]
                meta = get_dataset_schema(dataset)
                if col not in meta.names:
                    continue
                pa_type = meta.field(col).type

                if pa.types.is_null(pa_type):
                    # ignore all-NULL columns
                    # TODO: the query planner / regrouper could use that to emit 0 partitions
                    continue

                for v in val:
                    try:
                        _normalize_value(v, pa_type)

                        # special check for numpy signed vs unsigned integers
                        if hasattr(v, "dtype"):
                            dtype = v.dtype
                            if (
                                pd.api.types.is_unsigned_integer_dtype(dtype)
                                and pa.types.is_signed_integer(pa_type)
                            ) or (
                                pd.api.types.is_signed_integer_dtype(dtype)
                                and pa.types.is_unsigned_integer(pa_type)
                            ):
                                # proper exception message will be constructed below
                                raise TypeError()
                    except Exception:
                        raise TypeError(
                            (
                                "Condition `{single_condition}` has wrong type. Expected `{pa_type} ({pd_type})` but "
                                "got `{value} ({py_type})`"
                            ).format(
                                single_condition=single_condition,
                                pa_type=pa_type,
                                pd_type=pa_type.to_pandas_dtype().__name__,
                                value=v,
                                py_type=type(v).__name__,
                            )
                        )


def _process_conditions(
    conditions, cube, datasets, all_available_columns, indexed_columns
):
    """
    Process and check given query conditions.

    Parameters
    ----------
    conditions: Union[None, Condition, Iterable[Condition], Conjunction]
        Conditions that should be applied.
    cube: Cube
        Cube specification.
    datasets: Dict[str, kartothek.core.dataset.DatasetMetadata]
        Datasets that are present.
    all_available_columns: Set[str]
        All columns that are available for query.
    indexed_columns: Dict[str, Set[str]]
        Indexed columns per ktk_cube dataset ID.

    Returns
    -------
    conditions_pre: Dict[str, kartothek.core.cube.conditions.Conjunction]
        Conditions to be applied based on the index data alone.
    conditions_post: Dict[str, kartothek.core.cube.conditions.Conjunction]
        Conditions to be applied during the load process.

    Raises
    -------
    TypeError: In case of a wrong type.
    """
    conditions = Conjunction(conditions)

    condition_columns = conditions.columns
    missing = condition_columns - all_available_columns
    if missing:
        raise ValueError(
            "Following condition columns are required but are missing from the cube: {missing}".format(
                missing=", ".join(sorted(missing))
            )
        )
    _test_condition_types(conditions, datasets)

    conditions_split = conditions.split_by_column()

    conditions_pre = {}
    for ktk_cube_dataset_id, ds in datasets.items():
        candidate_cols = indexed_columns[ktk_cube_dataset_id]
        if not candidate_cols:
            continue

        filtered = [
            conj for col, conj in conditions_split.items() if col in candidate_cols
        ]
        if not filtered:
            continue

        conditions_pre[ktk_cube_dataset_id] = reduce(Conjunction.from_two, filtered)

    conditions_post = {}
    for ktk_cube_dataset_id, ds in datasets.items():
        candidate_cols = (get_dataset_columns(ds) & condition_columns) - set(
            cube.partition_columns
        )
        if not candidate_cols:
            continue

        filtered = [
            conj for col, conj in conditions_split.items() if col in candidate_cols
        ]
        if not filtered:
            continue

        conditions_post[ktk_cube_dataset_id] = reduce(Conjunction.from_two, filtered)

    return conditions_pre, conditions_post


def _process_payload(payload_columns, all_available_columns, cube):
    """
    Process and check given payload columns.

    Parameters
    ----------
    payload_columns: Optional[Iterable[str]]
        Which columns apart from ``dimension_columns`` and ``partition_by`` should be returned from the query.
    all_available_columns: Set[str]
        All columns that are available for query.
    cube: Cube
        Cube specification.

    Returns
    -------
    payload_columns: Set[str]
        Payload columns to be returned from the query.
    """
    if payload_columns is None:
        return all_available_columns
    else:
        payload_columns = converter_str_set(payload_columns)
        missing = payload_columns - all_available_columns
        if missing:
            raise ValueError(
                "Cannot find the following requested payload columns: {missing}".format(
                    missing=", ".join(sorted(missing))
                )
            )
        return payload_columns


def determine_intention(
    cube,
    datasets,
    dimension_columns,
    partition_by,
    conditions,
    payload_columns,
    indexed_columns,
):
    """
    Dermine and check user intention during the query process.

    Parameters
    ----------
    cube: Cube
        Cube specification.
    datasets: Dict[str, kartothek.core.dataset.DatasetMetadata]
        Datasets that are present.
    dimension_columns: Optional[Iterable[str]]
        Dimension columns of the query, may result in projection.
    partition_by: Optional[Iterable[str]]
        By which column logical partitions should be formed.
    conditions: Union[None, Condition, Iterable[Condition], Conjunction]
        Conditions that should be applied.
    payload_columns: Optional[Iterable[str]]
        Which columns apart from ``dimension_columns`` and ``partition_by`` should be returned from the query.
    indexed_columns: Dict[str, Set[str]]
        Indexed columns per ktk_cube dataset ID.

    Returns
    -------
    intention: QueryIntention
        Checked and filled in intention of the user.
    """
    all_available_columns = set(
        itertools.chain.from_iterable(
            [get_dataset_columns(ds) for ds in datasets.values()]
        )
    )

    dimension_columns = _process_dimension_columns(
        dimension_columns=dimension_columns, cube=cube
    )
    partition_by = _process_partition_by(
        partition_by=partition_by,
        cube=cube,
        all_available_columns=all_available_columns,
        indexed_columns=indexed_columns,
    )

    conditions_pre, conditions_post = _process_conditions(
        conditions=conditions,
        cube=cube,
        datasets=datasets,
        all_available_columns=all_available_columns,
        indexed_columns=indexed_columns,
    )

    payload_columns = _process_payload(
        payload_columns=payload_columns,
        all_available_columns=all_available_columns,
        cube=cube,
    )
    output_columns = tuple(
        sorted(
            set(partition_by)
            | set(dimension_columns)
            | set(payload_columns)
            | set(cube.partition_columns)
        )
    )

    return QueryIntention(
        dimension_columns=dimension_columns,
        partition_by=partition_by,
        conditions_pre=conditions_pre,
        conditions_post=conditions_post,
        output_columns=output_columns,
    )


[docs]@attr.s(frozen=True) class QueryIntention: """ Checked user intention during the query process. Parameters ---------- dimension_columns: Tuple[str, ...] Real dimension columns. partition_by: Tuple[str, ...] Real partition-by columns, may be empty. conditions_pre: Dict[str, kartothek.core.cube.conditions.Conjunction] Conditions to be applied based on the index data alone. conditions_post: Dict[str, kartothek.core.cube.conditions.Conjunction] Conditions to be applied during the load process. output_columns: Tuple[str, ...] Output columns to be passed back to the user, in correct order. """ dimension_columns = attr.ib(type=typing.Tuple[str, ...]) partition_by = attr.ib(type=typing.Tuple[str, ...]) conditions_pre = attr.ib(type=typing.Dict[str, Conjunction]) conditions_post = attr.ib(type=typing.Dict[str, Conjunction]) output_columns = attr.ib(type=typing.Tuple[str, ...])