Source code for kartothek.utils.predicate_converter

"""
Helper module to convert kartothek dataset load predicates into cube conditions.
"""

from typing import Any, List, Sequence, Tuple

import pandas as pd
import pyarrow as pa

from kartothek.core.cube.conditions import Condition


[docs]def write_predicate_as_cube_condition(predicate: Tuple[str, str, Any]) -> Condition: """ Rewrites a single io.dask.dataset 'read_dataset_as_ddf' predicate condition as cube condition. Remark: This function is restricted by "Condition.from_string" which does not allow for IsInCondition and InIntervalCondition and will throw an error if conditions of those types are passed. Parameters ---------- predicate: list list containing single predicate definition Returns ------- condition: Condition cube condition containing the predicate definition """ condition_string = None parameter_format_dict = {} if len(predicate) != 3: raise ValueError("Please use predicates consisting of exactly 3 entries") if type(predicate[2]) == int: condition_string = f"{predicate[0]} {predicate[1]} {str(predicate[2])}" parameter_format_dict[predicate[0]] = pa.int64() if type(predicate[2]) == str: condition_string = f"{predicate[0]} {predicate[1]} {predicate[2]}" parameter_format_dict[predicate[0]] = pa.string() if type(predicate[2]) == pd._libs.tslibs.timestamps.Timestamp: condition_string = ( f"{predicate[0]} {predicate[1]} {predicate[2].strftime('%Y-%m-%d')}" ) parameter_format_dict[predicate[0]] = pa.timestamp("s") if type(predicate[2]) == bool: condition_string = f"{predicate[0]} {predicate[1]} {str(predicate[2])}" parameter_format_dict[predicate[0]] = pa.bool_() if type(predicate[2]) == float: condition_string = f"{predicate[0]} {predicate[1]} {str(predicate[2])}" parameter_format_dict[predicate[0]] = pa.float64() if condition_string is not None: condition = Condition.from_string(condition_string, parameter_format_dict) else: raise TypeError( "Please only enter predicates for parameter values of the following type:" " str, int, float, bool or pandas timestamp, " ) return condition
[docs]def convert_predicates_to_cube_conditions( predicates: List[List[Tuple[str, str, Any]]], ) -> Sequence[Condition]: """ Converts a io.dask.dataset 'read_dataset_as_ddf' predicate to a cube condition Parameters ---------- predicates: list list containing a list of single predicates Returns ------- condition: Condition cube condition containing the combined predicate definitions """ condition: Any = () if len(predicates) > 1: raise ValueError( "Cube conditions cannot handle 'or' operators, therefore, " "please pass a predicate list with one element." ) for predicate in predicates[0]: condition = condition + (write_predicate_as_cube_condition(predicate),) return condition