kartothek.io.dask.compression module

kartothek.io.dask.compression.pack_payload(df: dask.dataframe.core.DataFrame, group_key: Union[List[str], str]) → dask.dataframe.core.DataFrame[source]

Pack all payload columns (everything except of group_key) into a single columns. This column will contain a single byte string containing the serialized and compressed payload data. The payload data is just dead weight when reshuffling. By compressing it once before the shuffle starts, this saves a lot of memory and network/disk IO.

Example:

>>> import pandas as pd
... import dask.dataframe as dd
... from dask.dataframe.shuffle import pack_payload
...
... df = pd.DataFrame({"A": [1, 1] * 2 + [2, 2] * 2 + [3, 3] * 2, "B": range(12)})
... ddf = dd.from_pandas(df, npartitions=2)

>>> ddf.partitions[0].compute()

A  B
0  1  0
1  1  1
2  1  2
3  1  3
4  2  4
5  2  5

>>> pack_payload(ddf, "A").partitions[0].compute()

A                               __dask_payload_bytes
0  1  b')...
1  2  b')...

See also https://github.com/dask/dask/pull/6259

kartothek.io.dask.compression.pack_payload_pandas(partition: pandas.core.frame.DataFrame, group_key: List[str]) → pandas.core.frame.DataFrame[source]
kartothek.io.dask.compression.unpack_payload(df: dask.dataframe.core.DataFrame, unpack_meta: pandas.core.frame.DataFrame) → dask.dataframe.core.DataFrame[source]

Revert payload packing of pack_payload and restores full dataframe.

kartothek.io.dask.compression.unpack_payload_pandas(partition: pandas.core.frame.DataFrame, unpack_meta: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame[source]

Revert pack_payload_pandas and restore packed payload

unpack_meta:
A dataframe indicating the schema of the unpacked data. This will be returned in case the input is empty