kartothek.serialization package¶
Submodules¶
Module contents¶
-
class
kartothek.serialization.
CsvSerializer
(compress=True)[source]¶ Bases:
kartothek.serialization._generic.DataFrameSerializer
-
static
restore_dataframe
(store: simplekv.KeyValueStore, key: str, filter_query: Optional[str] = None, columns: Optional[Iterable[str]] = None, predicate_pushdown_to_io: Optional[Any] = None, categories: Optional[Iterable[str]] = None, predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]] = None, date_as_object: Optional[Any] = None, **kwargs)[source]¶ Load a DataFrame from the specified store. The key is also used to detect the used format.
- Parameters
store – store engine
key – Key that specifies a path where object should be retrieved from the store resource.
filter_query – Optional query to filter the DataFrame. Must adhere to the specification of pandas.DataFrame.query.
columns – Only read in listed columns. When set to None, the full file will be read in.
predicate_pushdown_to_io – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the store layer that need to be addressed there.
categories – Columns that should be loaded as categoricals.
predicates –
Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.
Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describe a single column predicate. These inner predicate make are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.
date_as_object – Retrieve all date columns as an object column holding datetime.date objects instead of pd.Timestamp. Note that this option only works for type-stable serializers, e.g.
ParquetSerializer
.
-
store
(store, key_prefix, df)[source]¶ Persist a DataFrame to the specified store.
The used store format (e.g. Parquet) will be appended to the key.
- Parameters
store (simplekv.KeyValueStore) – store engine
key_prefix (str) – Key prefix that specifies a path where object should be stored on the store resource. The used file format will be appended to the key.
df (pandas.DataFrame or pyarrow.Table) – DataFrame that shall be persisted
- Returns
The actual key where the DataFrame is stored.
- Return type
-
static
-
class
kartothek.serialization.
DataFrameSerializer
[source]¶ Bases:
object
Abstract class that supports serializing DataFrames to/from simplekv stores.
-
classmethod
restore_dataframe
(store: simplekv.KeyValueStore, key: str, filter_query: Optional[str] = None, columns: Optional[Iterable[str]] = None, predicate_pushdown_to_io: bool = True, categories: Optional[Iterable[str]] = None, predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]] = None, date_as_object: bool = False) → pandas.core.frame.DataFrame[source]¶ Load a DataFrame from the specified store. The key is also used to detect the used format.
- Parameters
store – store engine
key – Key that specifies a path where object should be retrieved from the store resource.
filter_query – Optional query to filter the DataFrame. Must adhere to the specification of pandas.DataFrame.query.
columns – Only read in listed columns. When set to None, the full file will be read in.
predicate_pushdown_to_io – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the store layer that need to be addressed there.
categories – Columns that should be loaded as categoricals.
predicates –
Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.
Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describe a single column predicate. These inner predicate make are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.
date_as_object – Retrieve all date columns as an object column holding datetime.date objects instead of pd.Timestamp. Note that this option only works for type-stable serializers, e.g.
ParquetSerializer
.
-
store
(store: simplekv.KeyValueStore, key_prefix: str, df: pandas.core.frame.DataFrame) → str[source]¶ Persist a DataFrame to the specified store.
The used store format (e.g. Parquet) will be appended to the key.
- Parameters
store (simplekv.KeyValueStore) – store engine
key_prefix (str) – Key prefix that specifies a path where object should be stored on the store resource. The used file format will be appended to the key.
df (pandas.DataFrame or pyarrow.Table) – DataFrame that shall be persisted
- Returns
The actual key where the DataFrame is stored.
- Return type
-
type_stable
= False¶
-
classmethod
-
class
kartothek.serialization.
ParquetSerializer
(compression: str = 'SNAPPY', chunk_size: Optional[int] = None)[source]¶ Bases:
kartothek.serialization._generic.DataFrameSerializer
Serializer to store a
pandas.DataFrame
as parquetOn top of the plain serialization, this class handles forward and backwards compatibility between pyarrow versions.
- Parameters
compression – The compression algorithm to be used for the parquet file. For a comprehensive list of available compression algorithms, please see
pyarrow.parquet.write_table()
. The default is set to “SNAPPY” which usually offers a good balance between performance and compression rate. Depending on your data, picking a different algorithm may have vastly different characteristics and we can only recommend to test this on your own data. Depending on the reader parquet implementation, some compression algorithms may not be supported and we recommend to consult the documentation of the reader libraries first.chunk_size – The number of rows stored in a Parquet RowGroup. To leverage predicate pushdown, it is necessary to set this value. We do not apply any default value since a good choice is very sensitive to the kind of data you are using and what kind of storage. A typical range to try out would be somewhere between 50k-200k. To fully leverage row group statistics, it is highly recommended to sort the file before serialization.
Notes
Regarding type stability and supported types there are a few known limitations users should be aware of.
pandas.Categorical
Kartothek offers the keyword argument categories which contains a list of field names which are supposed to retrieved as a pandas.Categorical.
See also Dictionary Encoding
In [1]: ser = ParquetSerializer() In [2]: df = pd.DataFrame({"cat_field": pd.Categorical(["A"])}) In [3]: df.dtypes Out[3]: cat_field category dtype: object In [4]: ser.restore_dataframe(store, ser.store(store, "cat", df)) Out[4]: cat_field 0 A In [5]: ser.restore_dataframe(store, ser.store(store, "cat", df), categories=["cat_field"]) Out[5]: cat_field 0 A
Timestamps with nanosecond resolution
Timestamps can only be stored in micro second (us) accuracy. Trying to do differently may raise an exception.
See also Timestamp
In [6]: import pyarrow as pa In [7]: pa.__version__ Out[7]: '3.0.0' In [8]: df = pd.DataFrame({"nanosecond": [pd.Timestamp("2021-01-01 00:00:00.0000001")]}) # nanosecond resolution In [9]: ser.store(store, "key", df) --------------------------------------------------------------------------- ArrowInvalid Traceback (most recent call last) <ipython-input-9-58f958778ace> in <module> ----> 1 ser.store(store, "key", df) ~/checkouts/readthedocs.org/user_builds/kartothek/conda/latest/lib/python3.9/site-packages/kartothek-5.2.1.dev5+g1821ea5.d20211210-py3.9.egg/kartothek/serialization/_parquet.py in store(self, store, key_prefix, df) 334 buf = pa.BufferOutputStream() 335 --> 336 pq.write_table( 337 table, 338 buf, ~/checkouts/readthedocs.org/user_builds/kartothek/conda/latest/lib/python3.9/site-packages/pyarrow/parquet.py in write_table(table, where, row_group_size, version, use_dictionary, compression, write_statistics, use_deprecated_int96_timestamps, coerce_timestamps, allow_truncated_timestamps, data_page_size, flavor, filesystem, compression_level, use_byte_stream_split, data_page_version, **kwargs) 1796 data_page_version=data_page_version, 1797 **kwargs) as writer: -> 1798 writer.write_table(table, row_group_size=row_group_size) 1799 except Exception: 1800 if _is_path_like(where): ~/checkouts/readthedocs.org/user_builds/kartothek/conda/latest/lib/python3.9/site-packages/pyarrow/parquet.py in write_table(self, table, row_group_size) 649 raise ValueError(msg) 650 --> 651 self.writer.write_table(table, row_group_size=row_group_size) 652 653 def close(self): ~/checkouts/readthedocs.org/user_builds/kartothek/conda/latest/lib/python3.9/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.ParquetWriter.write_table() ~/checkouts/readthedocs.org/user_builds/kartothek/conda/latest/lib/python3.9/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status() ArrowInvalid: Casting from timestamp[ns] to timestamp[us] would lose data: 1609459200000000100
-
classmethod
restore_dataframe
(store: simplekv.KeyValueStore, key: str, filter_query: Optional[str] = None, columns: Optional[Iterable[str]] = None, predicate_pushdown_to_io: bool = True, categories: Optional[Iterable[str]] = None, predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]] = None, date_as_object: bool = False) → pandas.core.frame.DataFrame[source]¶ Load a DataFrame from the specified store. The key is also used to detect the used format.
- Parameters
store – store engine
key – Key that specifies a path where object should be retrieved from the store resource.
filter_query – Optional query to filter the DataFrame. Must adhere to the specification of pandas.DataFrame.query.
columns – Only read in listed columns. When set to None, the full file will be read in.
predicate_pushdown_to_io – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the store layer that need to be addressed there.
categories – Columns that should be loaded as categoricals.
predicates –
Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.
Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describe a single column predicate. These inner predicate make are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.
date_as_object – Retrieve all date columns as an object column holding datetime.date objects instead of pd.Timestamp. Note that this option only works for type-stable serializers, e.g.
ParquetSerializer
.
-
store
(store, key_prefix, df)[source]¶ Persist a DataFrame to the specified store.
The used store format (e.g. Parquet) will be appended to the key.
- Parameters
store (simplekv.KeyValueStore) – store engine
key_prefix (str) – Key prefix that specifies a path where object should be stored on the store resource. The used file format will be appended to the key.
df (pandas.DataFrame or pyarrow.Table) – DataFrame that shall be persisted
- Returns
The actual key where the DataFrame is stored.
- Return type
-
type_stable
= True¶
-
kartothek.serialization.
check_predicates
(predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]]) → None[source]¶ Check if predicates are well-formed.
-
kartothek.serialization.
columns_in_predicates
(predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]]) → Set[str][source]¶ Determine all columns which are mentioned in the list of predicates.
- Parameters
predicates – The predicates to be scaned.
-
kartothek.serialization.
filter_array_like
(array_like, op: str, value, mask=None, out=None, strict_date_types: bool = False, column_name: Optional[str] = None)[source]¶ Filter an array-like object using operations defined in the predicates
- Parameters
array_like –
The array like object to be filtered
See also pandas.api.types.is_array_like
op –
value –
mask – A boolean array like object which will be combined with the result of this evaluation using a logical AND. If an array with all True is given, it will be the same result as if left empty
out – An array into which the result is stored. If provided, it must have a shape that the inputs broadcast to. If not provided or None, a freshly-allocated array is returned.
strict_date_types – If False (default), cast all datelike values to datetime64 for comparison.
column_name – Name of the column where array_like originates from, used for nicer error messages.
See also
-
kartothek.serialization.
filter_df
(df, filter_query=None)[source]¶ General implementation of query filtering.
Serialisation formats such as Parquet that support predicate push-down may pre-filter in their own implementations.
-
kartothek.serialization.
filter_df_from_predicates
(df: pandas.core.frame.DataFrame, predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]], strict_date_types: bool = False) → pandas.core.frame.DataFrame[source]¶ Filter a pandas.DataFrame based on predicates in disjunctive normal form.
- Parameters
df – The pandas DataFrame to be filtered
predicates – Predicates in disjunctive normal form (DNF). For a thorough documentation, see
DataFrameSerializer.restore_dataframe
If None, the df is returned unmodifiedstrict_date_types – If False (default), cast all datelike values to datetime64 for comparison.
See also
-
kartothek.serialization.
filter_predicates_by_column
(predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]], columns: List[str]) → Optional[List[List[Tuple[str, str, LiteralValue]]]][source]¶ Takes a predicate list and removes all literals which are not referencing one of the given column
In [1]: from kartothek.serialization import filter_predicates_by_column In [2]: predicates = [[("A", "==", 1), ("B", "<", 5)], [("C", "==", 4)]] In [3]: filter_predicates_by_column(predicates, ["A"]) Out[3]: [[('A', '==', 1)]]
- Parameters
predicates – A list of predicates to be filtered
columns – A list of all columns allowed in the output