kartothek.serialization package

Module contents

class kartothek.serialization.CsvSerializer(compress=True)[source]

Bases: kartothek.serialization._generic.DataFrameSerializer

static restore_dataframe(store: simplekv.KeyValueStore, key: str, filter_query: Optional[str] = None, columns: Optional[Iterable[str]] = None, predicate_pushdown_to_io: Optional[Any] = None, categories: Optional[Iterable[str]] = None, predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]] = None, date_as_object: Optional[Any] = None, **kwargs)[source]

Load a DataFrame from the specified store. The key is also used to detect the used format.

Parameters
  • store – store engine

  • key – Key that specifies a path where object should be retrieved from the store resource.

  • filter_query – Optional query to filter the DataFrame. Must adhere to the specification of pandas.DataFrame.query.

  • columns – Only read in listed columns. When set to None, the full file will be read in.

  • predicate_pushdown_to_io – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the store layer that need to be addressed there.

  • categories – Columns that should be loaded as categoricals.

  • predicates

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describe a single column predicate. These inner predicate make are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

  • date_as_object – Retrieve all date columns as an object column holding datetime.date objects instead of pd.Timestamp. Note that this option only works for type-stable serializers, e.g. ParquetSerializer.

store(store, key_prefix, df)[source]

Persist a DataFrame to the specified store.

The used store format (e.g. Parquet) will be appended to the key.

Parameters
  • store (simplekv.KeyValueStore) – store engine

  • key_prefix (str) – Key prefix that specifies a path where object should be stored on the store resource. The used file format will be appended to the key.

  • df (pandas.DataFrame or pyarrow.Table) – DataFrame that shall be persisted

Returns

The actual key where the DataFrame is stored.

Return type

str

class kartothek.serialization.DataFrameSerializer[source]

Bases: object

Abstract class that supports serializing DataFrames to/from simplekv stores.

classmethod register_serializer(suffix, serializer)[source]
classmethod restore_dataframe(store: simplekv.KeyValueStore, key: str, filter_query: Optional[str] = None, columns: Optional[Iterable[str]] = None, predicate_pushdown_to_io: bool = True, categories: Optional[Iterable[str]] = None, predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]] = None, date_as_object: bool = False)pandas.core.frame.DataFrame[source]

Load a DataFrame from the specified store. The key is also used to detect the used format.

Parameters
  • store – store engine

  • key – Key that specifies a path where object should be retrieved from the store resource.

  • filter_query – Optional query to filter the DataFrame. Must adhere to the specification of pandas.DataFrame.query.

  • columns – Only read in listed columns. When set to None, the full file will be read in.

  • predicate_pushdown_to_io – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the store layer that need to be addressed there.

  • categories – Columns that should be loaded as categoricals.

  • predicates

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describe a single column predicate. These inner predicate make are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

  • date_as_object – Retrieve all date columns as an object column holding datetime.date objects instead of pd.Timestamp. Note that this option only works for type-stable serializers, e.g. ParquetSerializer.

store(store: simplekv.KeyValueStore, key_prefix: str, df: pandas.core.frame.DataFrame)str[source]

Persist a DataFrame to the specified store.

The used store format (e.g. Parquet) will be appended to the key.

Parameters
  • store (simplekv.KeyValueStore) – store engine

  • key_prefix (str) – Key prefix that specifies a path where object should be stored on the store resource. The used file format will be appended to the key.

  • df (pandas.DataFrame or pyarrow.Table) – DataFrame that shall be persisted

Returns

The actual key where the DataFrame is stored.

Return type

str

type_stable = False
class kartothek.serialization.ParquetSerializer(compression: str = 'SNAPPY', chunk_size: Optional[int] = None)[source]

Bases: kartothek.serialization._generic.DataFrameSerializer

Serializer to store a pandas.DataFrame as parquet

On top of the plain serialization, this class handles forward and backwards compatibility between pyarrow versions.

Parameters
  • compression – The compression algorithm to be used for the parquet file. For a comprehensive list of available compression algorithms, please see pyarrow.parquet.write_table(). The default is set to “SNAPPY” which usually offers a good balance between performance and compression rate. Depending on your data, picking a different algorithm may have vastly different characteristics and we can only recommend to test this on your own data. Depending on the reader parquet implementation, some compression algorithms may not be supported and we recommend to consult the documentation of the reader libraries first.

  • chunk_size – The number of rows stored in a Parquet RowGroup. To leverage predicate pushdown, it is necessary to set this value. We do not apply any default value since a good choice is very sensitive to the kind of data you are using and what kind of storage. A typical range to try out would be somewhere between 50k-200k. To fully leverage row group statistics, it is highly recommended to sort the file before serialization.

Notes

Regarding type stability and supported types there are a few known limitations users should be aware of.

  • pandas.Categorical

    Kartothek offers the keyword argument categories which contains a list of field names which are supposed to retrieved as a pandas.Categorical.

    See also Dictionary Encoding

    In [1]: ser = ParquetSerializer()
    
    In [2]: df = pd.DataFrame({"cat_field": pd.Categorical(["A"])})
    
    In [3]: df.dtypes
    Out[3]: 
    cat_field    category
    dtype: object
    
    In [4]: ser.restore_dataframe(store, ser.store(store, "cat", df))
    Out[4]: 
      cat_field
    0         A
    
    In [5]: ser.restore_dataframe(store, ser.store(store, "cat", df), categories=["cat_field"])
    Out[5]: 
      cat_field
    0         A
    
  • Timestamps with nanosecond resolution

    Timestamps can only be stored in micro second (us) accuracy. Trying to do differently may raise an exception.

    See also Timestamp

    In [6]: import pyarrow as pa
    
    In [7]: pa.__version__
    Out[7]: '3.0.0'
    
    In [8]: df = pd.DataFrame({"nanosecond": [pd.Timestamp("2021-01-01 00:00:00.0000001")]})
    
    # nanosecond resolution
    In [9]: ser.store(store, "key", df)
    ---------------------------------------------------------------------------
    ArrowInvalid                              Traceback (most recent call last)
    <ipython-input-9-58f958778ace> in <module>
    ----> 1 ser.store(store, "key", df)
    
    ~/checkouts/readthedocs.org/user_builds/kartothek/conda/latest/lib/python3.9/site-packages/kartothek-5.2.1.dev5+g1821ea5.d20211210-py3.9.egg/kartothek/serialization/_parquet.py in store(self, store, key_prefix, df)
        334         buf = pa.BufferOutputStream()
        335 
    --> 336         pq.write_table(
        337             table,
        338             buf,
    
    ~/checkouts/readthedocs.org/user_builds/kartothek/conda/latest/lib/python3.9/site-packages/pyarrow/parquet.py in write_table(table, where, row_group_size, version, use_dictionary, compression, write_statistics, use_deprecated_int96_timestamps, coerce_timestamps, allow_truncated_timestamps, data_page_size, flavor, filesystem, compression_level, use_byte_stream_split, data_page_version, **kwargs)
       1796                 data_page_version=data_page_version,
       1797                 **kwargs) as writer:
    -> 1798             writer.write_table(table, row_group_size=row_group_size)
       1799     except Exception:
       1800         if _is_path_like(where):
    
    ~/checkouts/readthedocs.org/user_builds/kartothek/conda/latest/lib/python3.9/site-packages/pyarrow/parquet.py in write_table(self, table, row_group_size)
        649             raise ValueError(msg)
        650 
    --> 651         self.writer.write_table(table, row_group_size=row_group_size)
        652 
        653     def close(self):
    
    ~/checkouts/readthedocs.org/user_builds/kartothek/conda/latest/lib/python3.9/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.ParquetWriter.write_table()
    
    ~/checkouts/readthedocs.org/user_builds/kartothek/conda/latest/lib/python3.9/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()
    
    ArrowInvalid: Casting from timestamp[ns] to timestamp[us] would lose data: 1609459200000000100
    
classmethod restore_dataframe(store: simplekv.KeyValueStore, key: str, filter_query: Optional[str] = None, columns: Optional[Iterable[str]] = None, predicate_pushdown_to_io: bool = True, categories: Optional[Iterable[str]] = None, predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]] = None, date_as_object: bool = False)pandas.core.frame.DataFrame[source]

Load a DataFrame from the specified store. The key is also used to detect the used format.

Parameters
  • store – store engine

  • key – Key that specifies a path where object should be retrieved from the store resource.

  • filter_query – Optional query to filter the DataFrame. Must adhere to the specification of pandas.DataFrame.query.

  • columns – Only read in listed columns. When set to None, the full file will be read in.

  • predicate_pushdown_to_io – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the store layer that need to be addressed there.

  • categories – Columns that should be loaded as categoricals.

  • predicates

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describe a single column predicate. These inner predicate make are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

  • date_as_object – Retrieve all date columns as an object column holding datetime.date objects instead of pd.Timestamp. Note that this option only works for type-stable serializers, e.g. ParquetSerializer.

store(store, key_prefix, df)[source]

Persist a DataFrame to the specified store.

The used store format (e.g. Parquet) will be appended to the key.

Parameters
  • store (simplekv.KeyValueStore) – store engine

  • key_prefix (str) – Key prefix that specifies a path where object should be stored on the store resource. The used file format will be appended to the key.

  • df (pandas.DataFrame or pyarrow.Table) – DataFrame that shall be persisted

Returns

The actual key where the DataFrame is stored.

Return type

str

type_stable = True
kartothek.serialization.check_predicates(predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]])None[source]

Check if predicates are well-formed.

kartothek.serialization.columns_in_predicates(predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]])Set[str][source]

Determine all columns which are mentioned in the list of predicates.

Parameters

predicates – The predicates to be scaned.

kartothek.serialization.default_serializer()[source]
kartothek.serialization.filter_array_like(array_like, op: str, value, mask=None, out=None, strict_date_types: bool = False, column_name: Optional[str] = None)[source]

Filter an array-like object using operations defined in the predicates

Parameters
  • array_like

    The array like object to be filtered

    See also pandas.api.types.is_array_like

  • op

  • value

  • mask – A boolean array like object which will be combined with the result of this evaluation using a logical AND. If an array with all True is given, it will be the same result as if left empty

  • out – An array into which the result is stored. If provided, it must have a shape that the inputs broadcast to. If not provided or None, a freshly-allocated array is returned.

  • strict_date_types – If False (default), cast all datelike values to datetime64 for comparison.

  • column_name – Name of the column where array_like originates from, used for nicer error messages.

kartothek.serialization.filter_df(df, filter_query=None)[source]

General implementation of query filtering.

Serialisation formats such as Parquet that support predicate push-down may pre-filter in their own implementations.

kartothek.serialization.filter_df_from_predicates(df: pandas.core.frame.DataFrame, predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]], strict_date_types: bool = False)pandas.core.frame.DataFrame[source]

Filter a pandas.DataFrame based on predicates in disjunctive normal form.

Parameters
  • df – The pandas DataFrame to be filtered

  • predicates – Predicates in disjunctive normal form (DNF). For a thorough documentation, see DataFrameSerializer.restore_dataframe If None, the df is returned unmodified

  • strict_date_types – If False (default), cast all datelike values to datetime64 for comparison.

kartothek.serialization.filter_predicates_by_column(predicates: Optional[List[List[Tuple[str, str, LiteralValue]]]], columns: List[str])Optional[List[List[Tuple[str, str, LiteralValue]]]][source]

Takes a predicate list and removes all literals which are not referencing one of the given column

In [1]: from kartothek.serialization import filter_predicates_by_column

In [2]: predicates = [[("A", "==", 1), ("B", "<", 5)], [("C", "==", 4)]]

In [3]: filter_predicates_by_column(predicates, ["A"])
Out[3]: [[('A', '==', 1)]]
Parameters
  • predicates – A list of predicates to be filtered

  • columns – A list of all columns allowed in the output