#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This module contains functionality for persisting/serialising DataFrames.
Available constants
**PredicatesType** - A type describing the format of predicates which is a list of ConjuntionType
**ConjunctionType** - A type describing a single Conjunction which is a list of literals
**LiteralType** - A type for a single literal
**LiteralValue** - A type indicating the value of a predicate literal
:meta public:
"""
from typing import Dict, Iterable, List, Optional, Set, Tuple, TypeVar
import numpy as np
import pandas as pd
from pandas.api.types import is_list_like
from simplekv import KeyValueStore
from kartothek.serialization._util import _check_contains_null
from ._util import ensure_unicode_string_type
LiteralValue = TypeVar("LiteralValue")
LiteralType = Tuple[str, str, LiteralValue]
ConjunctionType = List[LiteralType]
# Optional is part of the actual type since predicate=None
# is a sential for: All values
PredicatesType = Optional[List[ConjunctionType]]
[docs]class DataFrameSerializer:
"""
Abstract class that supports serializing DataFrames to/from
simplekv stores.
:meta public:
"""
_serializers: Dict[str, "DataFrameSerializer"] = {}
type_stable = False
def __ne__(self, other):
return not (self == other)
[docs] @classmethod
def register_serializer(cls, suffix, serializer):
cls._serializers[suffix] = serializer
[docs] @classmethod
def restore_dataframe(
cls,
store: KeyValueStore,
key: str,
filter_query: Optional[str] = None,
columns: Optional[Iterable[str]] = None,
predicate_pushdown_to_io: bool = True,
categories: Optional[Iterable[str]] = None,
predicates: Optional[PredicatesType] = None,
date_as_object: bool = False,
) -> pd.DataFrame:
"""
Load a DataFrame from the specified store. The key is also used to
detect the used format.
Parameters
----------
store
store engine
key
Key that specifies a path where object should be
retrieved from the store resource.
filter_query
Optional query to filter the DataFrame. Must adhere to the specification
of pandas.DataFrame.query.
columns
Only read in listed columns. When set to None, the full file
will be read in.
predicate_pushdown_to_io
Push predicates through to the I/O layer, default True. Disable
this if you see problems with predicate pushdown for the given
file even if the file format supports it. Note that this option
only hides problems in the store layer that need to be addressed
there.
categories
Columns that should be loaded as categoricals.
predicates
Optional list of predicates, like [[('x', '>', 0), ...], that are used
to filter the resulting DataFrame, possibly using predicate pushdown,
if supported by the file format.
This parameter is not compatible with filter_query.
Predicates are expressed in disjunctive normal form (DNF). This means
that the innermost tuple describe a single column predicate. These
inner predicate make are all combined with a conjunction (AND) into a
larger predicate. The most outer list then combines all predicates
with a disjunction (OR). By this, we should be able to express all
kinds of predicates that are possible using boolean logic.
date_as_object
Retrieve all date columns as an object column holding datetime.date objects
instead of pd.Timestamp. Note that this option only works for type-stable
serializers, e.g. ``ParquetSerializer``.
"""
if filter_query and predicates:
raise ValueError("Can only specify one of filter_query and predicates")
for suffix, serializer in cls._serializers.items():
if key.endswith(suffix):
df = serializer.restore_dataframe(
store,
key,
filter_query,
columns,
predicate_pushdown_to_io=predicate_pushdown_to_io,
categories=categories,
predicates=predicates,
date_as_object=date_as_object,
)
df.columns = df.columns.map(ensure_unicode_string_type)
return df
# No serialiser matched
raise ValueError(
"The specified file format for '{}' is not supported".format(key)
)
[docs] def store(self, store: KeyValueStore, key_prefix: str, df: pd.DataFrame) -> str:
"""
Persist a DataFrame to the specified store.
The used store format (e.g. Parquet) will be appended to the key.
Parameters
----------
store: simplekv.KeyValueStore
store engine
key_prefix: str
Key prefix that specifies a path where object should be
stored on the store resource. The used file format will be
appended to the key.
df: pandas.DataFrame or pyarrow.Table
DataFrame that shall be persisted
Returns
-------
str
The actual key where the DataFrame is stored.
"""
raise NotImplementedError("Abstract method called.")
[docs]def filter_df(df, filter_query=None):
"""
General implementation of query filtering.
Serialisation formats such as Parquet that support predicate push-down
may pre-filter in their own implementations.
"""
if df.shape[0] > 0 and filter_query is not None:
df = df.query(filter_query)
return df
[docs]def check_predicates(predicates: PredicatesType) -> None:
"""
Check if predicates are well-formed.
"""
if predicates is None:
return
if len(predicates) == 0:
raise ValueError("Empty predicates")
for conjunction_idx, conjunction in enumerate(predicates):
if not isinstance(conjunction, list):
raise ValueError(
f"Invalid predicates: Conjunction {conjunction_idx} should be a "
f"list of 3-tuples, got object of type {type(conjunction)} instead."
)
if len(conjunction) == 0:
raise ValueError(
f"Invalid predicates: Conjunction {conjunction_idx} is empty"
)
for clause_idx, clause in enumerate(conjunction):
if not isinstance(clause, tuple) and len(clause) == 3:
raise ValueError(
f"Invalid predicates: Clause {clause_idx} in conjunction {conjunction_idx} "
f"should be a 3-tuple, got object of type {type(clause)} instead"
)
_, op, val = clause
if (
isinstance(val, list)
and any(_check_contains_null(v) for v in val)
or _check_contains_null(val)
):
raise NotImplementedError(
"Null-terminated binary strings are not supported as predicate values."
)
if (
pd.api.types.is_scalar(val)
and pd.isnull(val)
and op not in ["==", "!="]
):
raise ValueError(
f"Invalid predicates: Clause {clause_idx} in conjunction {conjunction_idx} "
f"with null value and operator {op}. Only operators supporting null values "
"are '==', '!=' and 'in'."
)
[docs]def filter_predicates_by_column(
predicates: PredicatesType, columns: List[str]
) -> Optional[PredicatesType]:
"""
Takes a predicate list and removes all literals which are not referencing one of the given column
.. ipython:: python
from kartothek.serialization import filter_predicates_by_column
predicates = [[("A", "==", 1), ("B", "<", 5)], [("C", "==", 4)]]
filter_predicates_by_column(predicates, ["A"])
Parameters
----------
predicates:
A list of predicates to be filtered
columns:
A list of all columns allowed in the output
"""
if predicates is None:
return None
check_predicates(predicates)
filtered_predicates = []
for predicate in predicates:
new_conjunction = []
for col, op, val in predicate:
if col in columns:
new_conjunction.append((col, op, val))
if new_conjunction:
filtered_predicates.append(new_conjunction)
if filtered_predicates:
return filtered_predicates
else:
return None
[docs]def columns_in_predicates(predicates: PredicatesType) -> Set[str]:
"""
Determine all columns which are mentioned in the list of predicates.
Parameters
----------
predicates:
The predicates to be scaned.
"""
if predicates is None:
return set()
check_predicates(predicates)
# Determine the set of columns that are part of a predicate
columns = set()
for predicates_inner in predicates:
for col, _, _ in predicates_inner:
columns.add(col)
return columns
[docs]def filter_df_from_predicates(
df: pd.DataFrame,
predicates: Optional[PredicatesType],
strict_date_types: bool = False,
) -> pd.DataFrame:
"""
Filter a `pandas.DataFrame` based on predicates in disjunctive normal form.
Parameters
----------
df
The pandas DataFrame to be filtered
predicates
Predicates in disjunctive normal form (DNF). For a thorough documentation, see
:class:`DataFrameSerializer.restore_dataframe`
If None, the df is returned unmodified
strict_date_types
If False (default), cast all datelike values to datetime64 for comparison.
See Also
--------
* :ref:`predicate_pushdown`
"""
if predicates is None:
return df
indexer = np.zeros(len(df), dtype=bool)
for conjunction in predicates:
inner_indexer = np.ones(len(df), dtype=bool)
for column, op, value in conjunction:
column_name = ensure_unicode_string_type(column)
filter_array_like(
df[column_name].values,
op,
value,
inner_indexer,
inner_indexer,
strict_date_types=strict_date_types,
column_name=column_name,
)
indexer = inner_indexer | indexer
return df[indexer]
def _handle_categorical_data(array_like, require_ordered):
if require_ordered and pd.api.types.is_categorical_dtype(array_like):
if isinstance(array_like, pd.Categorical):
categorical = array_like
else:
categorical = array_like.cat
array_value_type = categorical.categories.dtype
if categorical.categories.is_monotonic:
array_like = categorical.as_ordered()
else:
array_like = categorical.reorder_categories(
categorical.categories.sort_values(), ordered=True
)
else:
array_value_type = array_like.dtype
return array_like, array_value_type
def _handle_null_arrays(array_like, value_dtype):
# NULL types might not be preserved well, so try to cast floats (pandas default type) to the value type
# Determine the type using the `kind` interface since this is common for a numpy array, pandas series and pandas extension arrays
if array_like.dtype.kind == "f" and np.isnan(array_like).all():
if array_like.dtype.kind != value_dtype.kind:
array_like = array_like.astype(value_dtype)
return array_like, array_like.dtype
def _handle_timelike_values(array_value_type, value, value_dtype, strict_date_types):
if is_list_like(value):
value = [pd.Timestamp(val).to_datetime64() for val in value]
else:
value = pd.Timestamp(value).to_datetime64()
value_dtype = pd.Series(value).dtype
return value, value_dtype
def _ensure_type_stability(
array_like, value, strict_date_types, require_ordered, column_name=None
):
"""
Ensure that the provided value and the provided array will have compatible
types, such that comparisons are unambiguous.
The type check is based on the numpy type system and accesses the arrays
`kind` attribute and asserts equality. The provided value will be
interpreted as a scalar in this case. For scalars which do not have a proper
python representation, we will relax the strictness as long as there is a
valid and unambiguous interpretation of a comparison operation. In
particular we consider the following combinations valid:
* unsigned integer (u) <> integer (i)
* zero-terminated bytes (S) <> Python Object (O)
* Unicode string (U) <> Python Object (O)
Parameters
----------
strict_date_types: bool
If False, assume that datetime.date and datetime.datetime are
compatible types. In this case, the value is cast appropriately
require_ordered: bool
Indicate if the operator to be evaluated will require a notion of
ordering. In the case of pd.Categorical we will then assume a
lexicographical ordering and cast the pd.CategoricalDtype accordingly
column_name: str, optional
Name of the column where `array_like` originates from, used for nicer
error messages.
"""
value_dtype = pd.Series(value if is_list_like(value) else [value]).dtype
array_like, array_value_type = _handle_categorical_data(array_like, require_ordered)
array_like, array_value_type = _handle_null_arrays(array_like, value_dtype)
compatible_types = [
# UINT and INT
("u", "i"),
("i", "u"),
# various string kinds
("O", "S"),
("O", "U"),
# bool w/ Nones
("b", "O"),
]
if not strict_date_types:
# objects (datetime.date) and datetime64
compatible_types.append(("O", "M"))
type_comp = (value_dtype.kind, array_value_type.kind)
if len(set(type_comp)) > 1 and type_comp not in compatible_types:
if column_name is None:
column_name = "<unknown>"
raise TypeError(
f"Unexpected type for predicate: Column {column_name!r} has pandas "
f"type '{array_value_type}', but predicate value {value!r} has "
f"pandas type '{value_dtype}' (Python type '{type(value)}')."
)
if "M" in type_comp:
value, value_dtype = _handle_timelike_values(
array_value_type, value, value_dtype, strict_date_types
)
return array_like, value
[docs]def filter_array_like(
array_like,
op: str,
value,
mask=None,
out=None,
strict_date_types: bool = False,
column_name: Optional[str] = None,
):
"""
Filter an array-like object using operations defined in the predicates
Parameters
----------
array_like
The array like object to be filtered
See also `pandas.api.types.is_array_like`
op
value
mask
A boolean array like object which will be combined with the result
of this evaluation using a logical AND. If an array with all True is
given, it will be the same result as if left empty
out
An array into which the result is stored. If provided, it must have a shape
that the inputs broadcast to. If not provided or None, a freshly-allocated
array is returned.
strict_date_types
If False (default), cast all datelike values to datetime64 for comparison.
column_name
Name of the column where `array_like` originates from, used for nicer
error messages.
See Also
--------
* :ref:`predicate_pushdown`
"""
if mask is None:
mask = np.ones(len(array_like), dtype=bool)
if out is None:
out = np.zeros(len(array_like), dtype=bool)
# In the case of an empty list, don't bother with evaluating types, etc.
if is_list_like(value) and len(value) == 0:
false_arr = np.zeros(len(array_like), dtype=bool)
np.logical_and(false_arr, mask, out=out)
return out
require_ordered = "<" in op or ">" in op
array_like, value = _ensure_type_stability(
array_like, value, strict_date_types, require_ordered, column_name
)
with np.errstate(invalid="ignore"):
if op == "==":
if pd.isnull(value):
np.logical_and(pd.isnull(array_like), mask, out=out)
else:
np.logical_and(array_like == value, mask, out=out)
elif op == "!=":
if pd.isnull(value):
np.logical_and(~pd.isnull(array_like), mask, out=out)
else:
np.logical_and(array_like != value, mask, out=out)
elif op == "<=":
np.logical_and(array_like <= value, mask, out=out)
elif op == ">=":
np.logical_and(array_like >= value, mask, out=out)
elif op == "<":
np.logical_and(array_like < value, mask, out=out)
elif op == ">":
np.logical_and(array_like > value, mask, out=out)
elif op == "in":
value = np.asarray(value)
nullmask = pd.isnull(value)
if value.dtype.kind in ("U", "S", "O"):
# See GH358
# If the values include duplicates, this would blow up with the
# join below, rendering the mask useless
unique_vals = np.unique(value[~nullmask])
value_ser = pd.Series(unique_vals, name="value")
arr_ser = pd.Series(array_like, name="array").to_frame()
matching_idx = (
~arr_ser.merge(
value_ser, left_on="array", right_on="value", how="left"
)
.value.isna()
.values
)
else:
matching_idx = (
np.isin(array_like, value)
if len(value) > 0
else np.zeros(len(array_like), dtype=bool)
)
if any(nullmask):
matching_idx |= pd.isnull(array_like)
np.logical_and(
matching_idx, mask, out=out,
)
else:
raise NotImplementedError("op not supported")
return out