Examples¶
Setup a store
In [1]: from tempfile import TemporaryDirectory
# You can, of course, also directly use S3, ABS or anything else
# supported by :mod:`storefact`
In [2]: dataset_dir = TemporaryDirectory()
In [3]: store_url = f"hfs://{dataset_dir.name}"
In [4]: import pandas as pd
In [5]: from kartothek.io.eager import store_dataframes_as_dataset
In [6]: from kartothek.io.eager import read_table
In [7]: df = pd.DataFrame({"Name": ["Paul", "Lisa"], "Age": [32, 29]})
In [8]: dataset_uuid = "my_list_of_friends"
In [9]: metadata = {
...: "Name": "My list of friends",
...: "Columns": {
...: "Name": "First name of my friend",
...: "Age": "honest age of my friend in years",
...: },
...: }
...:
In [10]: store_dataframes_as_dataset(
....: store=store_url, dataset_uuid=dataset_uuid, dfs=[df], metadata=metadata
....: )
....:
Out[10]: DatasetMetadata(uuid=my_list_of_friends, tables=['table'], partition_keys=[], metadata_version=4, indices=[], explicit_partitions=True)
# Load your data
# By default the single dataframe is stored in the 'core' table
In [11]: df_from_store = read_table(store=store_url, dataset_uuid=dataset_uuid, table="table")
In [12]: df_from_store
Out[12]:
Age Name
0 32 Paul
1 29 Lisa
Eager¶
Write¶
In [13]: import pandas as pd
In [14]: from kartothek.io.eager import store_dataframes_as_dataset
# Now, define the actual partitions. This list will, most of the time,
# be the intermediate result of a previously executed pipeline which e.g. pulls
# data from an external data source
# In our particular case, we'll use manual input and define our partitions explicitly
# We'll define two partitions which both have two tables
In [15]: input_list_of_partitions = [
....: {
....: "label": "FirstPartition",
....: "data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
....: },
....: {
....: "label": "SecondPartition",
....: "data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
....: },
....: ]
....:
# The pipeline will return a :class:`~kartothek.core.dataset.DatasetMetadata` object
# which refers to the created dataset
In [16]: dataset = store_dataframes_as_dataset(
....: dfs=input_list_of_partitions,
....: store=store_url,
....: dataset_uuid="MyFirstDataset",
....: metadata={"dataset": "metadata"}, # This is optional dataset metadata
....: metadata_version=4,
....: )
....:
In [17]: dataset
Out[17]: DatasetMetadata(uuid=MyFirstDataset, tables=['FirstCategory', 'SecondCategory'], partition_keys=[], metadata_version=4, indices=[], explicit_partitions=True)
Read¶
In [18]: import pandas as pd
In [19]: from kartothek.io.eager import read_dataset_as_dataframes
# Create the pipeline with a minimal set of configs
In [20]: list_of_partitions = read_dataset_as_dataframes(
....: dataset_uuid="MyFirstDataset", store=store_url
....: )
....:
# In case you were using the dataset created in the Write example
In [21]: for d1, d2 in zip(
....: list_of_partitions,
....: [
....: {"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
....: {"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
....: ],
....: ):
....: for kv1, kv2 in zip(d1.items(), d2.items()):
....: k1, v1 = kv1
....: k2, v2 = kv2
....: assert k1 == k2 and all(v1 == v2)
....:
Iter¶
Write¶
In [22]: import pandas as pd
In [23]: from kartothek.io.iter import store_dataframes_as_dataset__iter
In [24]: input_list_of_partitions = [
....: {
....: "label": "FirstPartition",
....: "data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
....: },
....: {
....: "label": "SecondPartition",
....: "data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
....: },
....: ]
....:
# The pipeline will return a :class:`~kartothek.core.dataset.DatasetMetadata` object
# which refers to the created dataset
In [25]: dataset = store_dataframes_as_dataset__iter(
....: input_list_of_partitions,
....: store=store_url,
....: dataset_uuid="MyFirstDatasetIter",
....: metadata={"dataset": "metadata"}, # This is optional dataset metadata
....: metadata_version=4,
....: )
....:
In [26]: dataset
Out[26]: DatasetMetadata(uuid=MyFirstDatasetIter, tables=['FirstCategory', 'SecondCategory'], partition_keys=[], metadata_version=4, indices=[], explicit_partitions=True)
Read¶
In [27]: import pandas as pd
In [28]: from kartothek.io.iter import read_dataset_as_dataframes__iterator
# Create the pipeline with a minimal set of configs
In [29]: list_of_partitions = read_dataset_as_dataframes__iterator(
....: dataset_uuid="MyFirstDatasetIter", store=store_url
....: )
....:
# the iter backend returns a generator object. In our case we want to look at
# all partitions at once
In [30]: list_of_partitions = list(list_of_partitions)
# In case you were using the dataset created in the Write example
In [31]: for d1, d2 in zip(
....: list_of_partitions,
....: [
....: {"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
....: {"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
....: ],
....: ):
....: for kv1, kv2 in zip(d1.items(), d2.items()):
....: k1, v1 = kv1
....: k2, v2 = kv2
....: assert k1 == k2 and all(v1 == v2)
....:
Dask¶
Write¶
In [32]: import pandas as pd
In [33]: from kartothek.io.dask.delayed import store_delayed_as_dataset
In [34]: input_list_of_partitions = [
....: {
....: "label": "FirstPartition",
....: "data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
....: },
....: {
....: "label": "SecondPartition",
....: "data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
....: },
....: ]
....:
# This will return a :class:`~dask.delayed`. The figure below
# show the generated task graph.
In [35]: task = store_delayed_as_dataset(
....: input_list_of_partitions,
....: store=store_url,
....: dataset_uuid="MyFirstDatasetDask",
....: metadata={"dataset": "metadata"}, # This is optional dataset metadata
....: metadata_version=4,
....: )
....:
In [36]: task.compute()
Out[36]: DatasetMetadata(uuid=MyFirstDatasetDask, tables=['FirstCategory', 'SecondCategory'], partition_keys=[], metadata_version=4, indices=[], explicit_partitions=True)

Task graph for the above dataset store pipeline.¶
Read¶
In [37]: import dask
In [38]: import pandas as pd
In [39]: from kartothek.io.dask.delayed import read_dataset_as_delayed
In [40]: tasks = read_dataset_as_delayed(dataset_uuid="MyFirstDatasetDask", store=store_url)
In [41]: tasks
Out[41]:
[Delayed('_get_data-531699b9-4d3b-4800-844e-c02a3997706c'),
Delayed('_get_data-21da6929-64fb-4dc1-8e1c-d8df5830a28b')]
In [42]: dask.compute(tasks)
Out[42]:
([{'FirstCategory': Empty DataFrame
Columns: []
Index: [],
'SecondCategory': Empty DataFrame
Columns: []
Index: []},
{'FirstCategory': Empty DataFrame
Columns: []
Index: [],
'SecondCategory': Empty DataFrame
Columns: []
Index: []}],)