"""
The condition sublanguage.
"""
import copy
import itertools
import re
import typing
from collections import defaultdict
import attr
import pandas as pd
from kartothek.serialization import filter_df_from_predicates
from kartothek.utils.converters import (
converter_str,
converter_tuple,
get_str_to_python_converter,
)
__all__ = (
"C",
"Condition",
"Conjunction",
"EqualityCondition",
"GreaterEqualCondition",
"GreaterThanCondition",
"InIntervalCondition",
"InequalityCondition",
"IsInCondition",
"LessEqualCondition",
"LessThanCondition",
"SimpleCondition",
"VirtualColumn",
)
def _validator_value(instance, attribute, value):
if pd.isnull(value):
raise ValueError(
'Cannot use NULL-value to compare w/ column "{}"'.format(instance.column)
)
if isinstance(value, VirtualColumn):
raise TypeError("Cannot compare two columns.")
if isinstance(value, (Condition, Conjunction)):
raise TypeError("Cannot use nested conditions.")
def _validator_valuelist(instance, attribute, value):
for v in value:
_validator_value(instance, attribute, v)
def _validator_condlist(instance, attribute, value):
if any(not isinstance(x, Condition) for x in value):
raise TypeError("Can only build conjunction out of conditions.")
def _converter_condlist(obj) -> tuple:
if isinstance(obj, Conjunction):
return obj.conditions
elif obj is None:
return ()
else:
return converter_tuple(obj)
[docs]@attr.s(frozen=True, eq=False)
class VirtualColumn:
"""
Virtual column that can be used to easily construct conditions.
The following operations are supported:
+---------------+--------------------------------------+-----------------------------------+
| Operation | Python Example | Result Class |
+===============+======================================+===================================+
| Equal | ``C("a") == 42`` | :py:class:`EqualityCondition` |
+---------------+--------------------------------------+-----------------------------------+
| Not Equal | ``C("a") != 42`` | :py:class:`InequalityCondition` |
+---------------+--------------------------------------+-----------------------------------+
| Less Than | ``C("a") < 42`` | :py:class:`LessThanCondition` |
+---------------+--------------------------------------+-----------------------------------+
| Less Equal | ``C("a") <= 42`` | :py:class:`LessEqualCondition` |
+---------------+--------------------------------------+-----------------------------------+
| Greater Than | ``C("a") > 42`` | :py:class:`GreaterThanCondition` |
+---------------+--------------------------------------+-----------------------------------+
| Greater Equal | ``C("a") >= 42`` | :py:class:`GreaterEqualCondition` |
+---------------+--------------------------------------+-----------------------------------+
| Is In | ``C("a").isin([1, 2])`` | :py:class:`IsInCondition` |
+---------------+--------------------------------------+-----------------------------------+
| In Interval | ``C("a").in_interval(0, 100)`` | :py:class:`InIntervalCondition` |
+---------------+--------------------------------------+-----------------------------------+
Parameters
----------
name: str
Column name.
"""
name = attr.ib(converter=converter_str, type=str)
def __eq__(self, other):
return EqualityCondition(self.name, other)
def __ne__(self, other):
return InequalityCondition(self.name, other)
def __lt__(self, other):
return LessThanCondition(self.name, other)
def __le__(self, other):
return LessEqualCondition(self.name, other)
def __gt__(self, other):
return GreaterThanCondition(self.name, other)
def __ge__(self, other):
return GreaterEqualCondition(self.name, other)
[docs] def isin(self, other):
return IsInCondition(self.name, other)
[docs] def in_interval(self, start=None, stop=None):
return InIntervalCondition(self.name, start, stop)
C = VirtualColumn
[docs]@attr.s(frozen=True)
class Condition:
"""
An abstract condition on a column.
Multiple conditions may be combined using ``&``::
(C('a') == 1) & (C('b') == 2)
Parameters
----------
column: str
Column name.
"""
column = attr.ib(converter=converter_str, type=str)
def __bool__(self):
raise TypeError(
"Cannot check if a condition is non-zero.\n"
"Hint: Did you just tried something like `bool(condition)` or `A <= column < B`?"
)
__nonzero__ = __bool__ # Python 2
def __and__(self, other):
return Conjunction.from_two(self, other)
[docs] def filter_df(self, df):
"""
Filter given DataFrame w/ condition.
Parameters
----------
df: pandas.DataFrame
DataFrame to evaluate on, must contain required column.
Returns
-------
result: pandas.DataFrame
Part of the DataFrame for which the condition holds.
"""
return Conjunction([self]).filter_df(df)
[docs] @staticmethod
def from_string(s, all_types):
"""
Parse string as condition object.
Parameters
----------
s: str
String to parse.
all_types: Dict[str, pyarrow.DataType]
Mapping from all known columns to pyarrow types.
Returns
-------
condition: Condition
Parsed condition.
Raises
------
ValueError: If condition cannot be parsed.
"""
m = re.match(
pattern=r"""
^ # anchor
\s* # optional space
\(? # optional open bracket
\s* # optional space
([^!<>=\s]+) # column name
\s* # optional space
(==|=|<=|<|>=|>|!=) # operator
([^)=]+) # value
\)? # optional closing bracket
\s* # optional space
$ # anchor
""",
string=s,
flags=re.VERBOSE,
)
if not m:
raise ValueError('Cannot parse condition "{s}"'.format(s=s))
col, op, var = m.groups()
col_obj = C(col)
pa_type = all_types.get(col)
if pa_type is None:
raise ValueError(
'Unknown column "{col}" in condition "{s}"'.format(col=col, s=s)
)
var_f = get_str_to_python_converter(pa_type)
var_obj = var_f(var.strip())
if (op == "==") or (op == "="):
return col_obj == var_obj
elif op == "<=":
return col_obj <= var_obj
elif op == "<":
return col_obj < var_obj
elif op == ">=":
return col_obj >= var_obj
elif op == ">":
return col_obj > var_obj
elif op == "!=":
return col_obj != var_obj
else:
raise RuntimeError("unreachable")
[docs]@attr.s(frozen=True)
class SimpleCondition(Condition):
"""
A simple condition that only emits a single predicate part. Must be subclassed.
Parameters
----------
column: str
Column name.
value: Any
To which value the column should be compared to.
"""
value = attr.ib(validator=[_validator_value])
active = True
def __str__(self):
return "{column} {op} {value}".format(
column=self.column, op=self.OP, value=self.value
)
@property
def predicate_part(self):
"""
Part of the inner list for Kartothek predicate pushdown.
"""
return [(self.column, self.OP, self.value)]
[docs]@attr.s(frozen=True)
class EqualityCondition(SimpleCondition):
"""
Condition on column equality.
Parameters
----------
column: str
Column name.
value: Any
To which value the column should be compared to.
"""
OP = "=="
[docs]@attr.s(frozen=True)
class InequalityCondition(SimpleCondition):
"""
Condition on column inequality.
Parameters
----------
column: str
Column name.
value: Any
To which value the column should be compared to.
"""
OP = "!="
[docs]@attr.s(frozen=True)
class LessThanCondition(SimpleCondition):
"""
Condition that describes that a column should be strictly less than the given value.
Parameters
----------
column: str
Column name.
value: Any
To which value the column should be compared to.
"""
OP = "<"
[docs]@attr.s(frozen=True)
class LessEqualCondition(SimpleCondition):
"""
Condition that describes that a column should be less or equal to the given value.
Parameters
----------
column: str
Column name.
value: Any
To which value the column should be compared to.
"""
OP = "<="
[docs]@attr.s(frozen=True)
class GreaterThanCondition(SimpleCondition):
"""
Condition that describes that a column should be strictly greater than the given value.
Parameters
----------
column: str
Column name.
value: Any
To which value the column should be compared to.
"""
OP = ">"
[docs]@attr.s(frozen=True)
class GreaterEqualCondition(SimpleCondition):
"""
Condition that describes that a column should be greater or equal to the given value.
Parameters
----------
column: str
Column name.
value: Any
To which value the column should be compared to.
"""
OP = ">="
[docs]@attr.s(frozen=True)
class IsInCondition(SimpleCondition):
"""
Condition that describes that values in a column should be within the given list.
Parameters
----------
columns: str
Column name.
value: Tuple[Any]
Tuple to check for.
"""
OP = "in"
value = attr.ib(
converter=converter_tuple,
type=typing.Tuple[typing.Any],
validator=[_validator_valuelist],
)
[docs]@attr.s(frozen=True)
class InIntervalCondition(Condition):
"""
Condition expressing that values of a column should be in a given interval.
Parameters
----------
columns: str
Column name.
start: Any
Inclusive start of the interval, optional.
stop: Any
Exclusive stop of the interval, optional.
"""
start = attr.ib(
default=None, validator=[attr.validators.optional(_validator_value)]
)
stop = attr.ib(default=None, validator=[attr.validators.optional(_validator_value)])
def __str__(self):
return "{column}.in_interval({start}, {stop})".format(
column=self.column, start=self.start, stop=self.stop
)
@property
def predicate_part(self):
"""
Part of the inner list for Kartothek predicate pushdown.
"""
result = []
if self.start is not None:
result.append((self.column, ">=", self.start))
if self.stop is not None:
result.append((self.column, "<", self.stop))
return result
@property
def active(self):
return (self.start is not None) or (self.stop is not None)
[docs]@attr.s(frozen=True)
class Conjunction:
"""
Conjunction of multiple :class:`Condition` objects.
Parameters
----------
conditions: Tuple[Condition]
Tuple of conditions that must all be satisfied at the same time. Can address multiple columns.
"""
conditions = attr.ib(
converter=_converter_condlist,
type=typing.Tuple[Condition],
validator=[_validator_condlist],
)
[docs] @classmethod
def from_two(
cls,
left: typing.Union[Condition, "Conjunction"],
right: typing.Union[Condition, "Conjunction"],
) -> "Conjunction":
"""
Create conjunction from two elements.
Parameters
----------
left
Left part.
right
Right part.
Returns
-------
conjunction: Conjunction
Conjunction of the two given parts.
"""
conditions: typing.List[Condition] = []
for obj in (left, right):
if isinstance(obj, Conjunction):
conditions += obj.conditions
else:
conditions.append(obj)
return cls(conditions)
def __and__(self, other):
return Conjunction.from_two(self, other)
def __str__(self):
return " & ".join("({})".format(cond) for cond in self.conditions)
@property
def columns(self):
"""
Columns that are checked by this conjunction.
"""
return {cond.column for cond in self.conditions if cond.active}
@property
def predicate(self):
"""
Predicate to be consumed by Kartothek and DataFrame serializer.
"""
result = list(
itertools.chain.from_iterable(
cond.predicate_part for cond in self.conditions
)
)
if result:
return result
else:
return None
[docs] def split_by_column(self):
"""
Split conjunction by column.
Non-active conditions will be dropped.
Returns
-------
split: Dict[str, Conjunction]
Conjunctions by affected column.
"""
parts = defaultdict(list)
for cond in self.conditions:
if cond.active:
parts[cond.column].append(cond)
return {column: Conjunction(part) for column, part in parts.items()}
[docs] def filter_df(self, df):
"""
Filter given DataFrame w/ conjunction.
NULL-values will always treated as non-matching.
Parameters
----------
df: pandas.DataFrame
DataFrame to evaluate on, must contain required column.
Returns
-------
result: pandas.DataFrame
Part of the DataFrame for which the conjunction holds.
"""
df = df.loc[df[list(self.columns)].notnull().all(axis=1)]
predicate = self.predicate
if predicate is None:
# kartothek does not support empty predicate lists
return df
else:
return filter_df_from_predicates(df, [self.predicate])
[docs] def to_jsonarray(self):
"""
Converts conjunction to a list that can be used for JSON/YAML serialization.
.. important::
Not all value types that can be used within conditions are JSON-serializable (e.g. ``datetime`` objects).
The user is responsible of ensuring that these values can pass functions like ``json.dump`` or has to
implement proper error handling.
Returns
-------
jsonarray: List[Dict[str, Any]]
JSON-compatible array.
Example
-------
>>> import json
>>> from kartothek.core.cube.conditions import C
>>> conjunction = (
... (C("x") > 1)
... & (C("y").isin(["foo", "bar"]))
... )
>>> array = conjunction.to_jsonarray()
>>> print(json.dumps(array, indent=True, sort_keys=True))
[
{
"column": "x",
"type": "GreaterThanCondition",
"value": 1
},
{
"column": "y",
"type": "IsInCondition",
"value": [
"foo",
"bar"
]
}
]
See Also
--------
from_jsonarray: Converts array back into a conjunction.
"""
jsonarray = []
for cond in self.conditions:
d = attr.asdict(cond)
d["type"] = type(cond).__name__
jsonarray.append(d)
return jsonarray
[docs] @staticmethod
def from_jsonarray(array):
"""
Recover conjunction from JSON-compatible array.
Parameters
----------
jsonarray: List[Dict[str, Any]]
JSON-compatible array.
Returns
-------
conjunction: Conjunction
Recovered conjunction.
Raises
------
TypeError: If are wrong or unknown condition type was passed.
ValueError: If ``"type"`` attribute within a condition is missing.
See Also
--------
to_jsonarray: Creates array, illustrates format.
"""
if not isinstance(array, list):
raise TypeError("jsonarray must be a list")
# find all possible classes
all_classes = {}
seen = set()
todo = [Condition]
for c in todo:
if c in seen:
continue
sub = c.__subclasses__()
if sub:
# not a leaf
todo += c.__subclasses__()
else:
# leaf == found a class
all_classes[c.__name__] = c
seen.add(c)
# deserialize all conditions
conditions = []
for element in array:
if not isinstance(element, dict):
raise TypeError("Condition in jsonarray must be a dict")
element = copy.deepcopy(element)
if "type" not in element:
raise ValueError("Missing type value for condition")
t = element.pop("type")
if t not in all_classes:
raise TypeError(f"Unknown condition class '{t}'")
c = all_classes[t]
conditions.append(c(**element))
return Conjunction(conditions)
[docs] @staticmethod
def from_string(s, all_types):
"""
Parse string as conjunction object.
.. important::
This is intended to be used for human interaction (e.g. CLIs). Do not use this for serializing and
deserializing conditions, since this does not support all conditions and is not guaranteed to be
roundtrip-safe. For the purpose of serialization, better use :meth:`to_jsonarray` and
:meth:`from_jsonarray`.
Parameters
----------
s: str
String to parse.
all_types: Dict[str, pyarrow.DataType]
Mapping from all known columns to pyarrow types.
Returns
-------
conjunction: Conjunction
Parsed conjunction.
Raises
------
ValueError: If condition cannot be parsed.
"""
s = s.strip()
if s:
return Conjunction(
[Condition.from_string(sub, all_types) for sub in s.split("&")]
)
else:
return Conjunction([])