Examples

Setup a store

In [1]: from storefact import get_store_from_url

In [2]: from functools import partial

In [3]: from tempfile import TemporaryDirectory

# You can, of course, also directly use S3, ABS or anything else
# supported by :mod:`storefact`
In [4]: dataset_dir = TemporaryDirectory()

In [5]: store_factory = partial(get_store_from_url, f"hfs://{dataset_dir.name}")
In [6]: import pandas as pd

In [7]: from kartothek.io.eager import store_dataframes_as_dataset

In [8]: from kartothek.io.eager import read_table

In [9]: df = pd.DataFrame({"Name": ["Paul", "Lisa"], "Age": [32, 29]})

In [10]: dataset_uuid = "my_list_of_friends"

In [11]: metadata = {
   ....:     "Name": "My list of friends",
   ....:     "Columns": {
   ....:         "Name": "First name of my friend",
   ....:         "Age": "honest age of my friend in years",
   ....:     },
   ....: }
   ....: 

In [12]: store_dataframes_as_dataset(
   ....:     store=store_factory, dataset_uuid=dataset_uuid, dfs=[df], metadata=metadata
   ....: )
   ....: 
Out[12]: DatasetMetadata(uuid=my_list_of_friends, tables=['table'], partition_keys=[], metadata_version=4, indices=[], explicit_partitions=True)

# Load your data
# By default the single dataframe is stored in the 'core' table
In [13]: df_from_store = read_table(
   ....:     store=store_factory, dataset_uuid=dataset_uuid, table="table"
   ....: )
   ....: 

In [14]: df_from_store
Out[14]: 
   Age  Name
0   32  Paul
1   29  Lisa

Eager

Write

In [15]: import pandas as pd

In [16]: from kartothek.io.eager import store_dataframes_as_dataset

#  Now, define the actual partitions. This list will, most of the time,
# be the intermediate result of a previously executed pipeline which e.g. pulls
# data from an external data source
# In our particular case, we'll use manual input and define our partitions explicitly
# We'll define two partitions which both have two tables
In [17]: input_list_of_partitions = [
   ....:     {
   ....:         "label": "FirstPartition",
   ....:         "data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
   ....:     },
   ....:     {
   ....:         "label": "SecondPartition",
   ....:         "data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
   ....:     },
   ....: ]
   ....: 

# The pipeline will return a :class:`~kartothek.core.dataset.DatasetMetadata` object
#  which refers to the created dataset
In [18]: dataset = store_dataframes_as_dataset(
   ....:     dfs=input_list_of_partitions,
   ....:     store=store_factory,
   ....:     dataset_uuid="MyFirstDataset",
   ....:     metadata={"dataset": "metadata"},  #  This is optional dataset metadata
   ....:     metadata_version=4,
   ....: )
   ....: 

In [19]: dataset
Out[19]: DatasetMetadata(uuid=MyFirstDataset, tables=['FirstCategory', 'SecondCategory'], partition_keys=[], metadata_version=4, indices=[], explicit_partitions=True)

Read

In [20]: import pandas as pd

In [21]: from kartothek.io.eager import read_dataset_as_dataframes

#  Create the pipeline with a minimal set of configs
In [22]: list_of_partitions = read_dataset_as_dataframes(
   ....:     dataset_uuid="MyFirstDataset", store=store_factory
   ....: )
   ....: 

# In case you were using the dataset created in the Write example
In [23]: for d1, d2 in zip(
   ....:     list_of_partitions,
   ....:     [
   ....:         {"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
   ....:         {"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
   ....:     ],
   ....: ):
   ....:     for kv1, kv2 in zip(d1.items(), d2.items()):
   ....:         k1, v1 = kv1
   ....:         k2, v2 = kv2
   ....:         assert k1 == k2 and all(v1 == v2)
   ....: 

Iter

Write

In [24]: import pandas as pd

In [25]: from kartothek.io.iter import store_dataframes_as_dataset__iter

In [26]: input_list_of_partitions = [
   ....:     {
   ....:         "label": "FirstPartition",
   ....:         "data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
   ....:     },
   ....:     {
   ....:         "label": "SecondPartition",
   ....:         "data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
   ....:     },
   ....: ]
   ....: 

# The pipeline will return a :class:`~kartothek.core.dataset.DatasetMetadata` object
#  which refers to the created dataset
In [27]: dataset = store_dataframes_as_dataset__iter(
   ....:     input_list_of_partitions,
   ....:     store=store_factory,
   ....:     dataset_uuid="MyFirstDatasetIter",
   ....:     metadata={"dataset": "metadata"},  #  This is optional dataset metadata
   ....:     metadata_version=4,
   ....: )
   ....: 

In [28]: dataset
Out[28]: DatasetMetadata(uuid=MyFirstDatasetIter, tables=['FirstCategory', 'SecondCategory'], partition_keys=[], metadata_version=4, indices=[], explicit_partitions=True)

Read

In [29]: import pandas as pd

In [30]: from kartothek.io.iter import read_dataset_as_dataframes__iterator

#  Create the pipeline with a minimal set of configs
In [31]: list_of_partitions = read_dataset_as_dataframes__iterator(
   ....:     dataset_uuid="MyFirstDatasetIter", store=store_factory
   ....: )
   ....: 

# the iter backend returns a generator object. In our case we want to look at
# all partitions at once
In [32]: list_of_partitions = list(list_of_partitions)

# In case you were using the dataset created in the Write example
In [33]: for d1, d2 in zip(
   ....:     list_of_partitions,
   ....:     [
   ....:         {"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
   ....:         {"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
   ....:     ],
   ....: ):
   ....:     for kv1, kv2 in zip(d1.items(), d2.items()):
   ....:         k1, v1 = kv1
   ....:         k2, v2 = kv2
   ....:         assert k1 == k2 and all(v1 == v2)
   ....: 

Dask

Write

In [34]: import pandas as pd

In [35]: from kartothek.io.dask.delayed import store_delayed_as_dataset

In [36]: input_list_of_partitions = [
   ....:     {
   ....:         "label": "FirstPartition",
   ....:         "data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
   ....:     },
   ....:     {
   ....:         "label": "SecondPartition",
   ....:         "data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
   ....:     },
   ....: ]
   ....: 

# This will return a :class:`~dask.delayed`. The figure below
# show the generated task graph.
In [37]: task = store_delayed_as_dataset(
   ....:     input_list_of_partitions,
   ....:     store=store_factory,
   ....:     dataset_uuid="MyFirstDatasetDask",
   ....:     metadata={"dataset": "metadata"},  #  This is optional dataset metadata
   ....:     metadata_version=4,
   ....: )
   ....: 

In [38]: task.compute()
Out[38]: DatasetMetadata(uuid=MyFirstDatasetDask, tables=['FirstCategory', 'SecondCategory'], partition_keys=[], metadata_version=4, indices=[], explicit_partitions=True)
../_images/taskgraph.jpeg

Task graph for the above dataset store pipeline.

Read

In [39]: import dask

In [40]: import pandas as pd

In [41]: from kartothek.io.dask.delayed import read_dataset_as_delayed

In [42]: tasks = read_dataset_as_delayed(dataset_uuid="MyFirstDatasetDask", store=store_factory)

In [43]: tasks
Out[43]: 
[Delayed('_get_data-a6494c3e-ec13-4eb4-8888-483281327d09'),
 Delayed('_get_data-eff36b58-c5dd-42a0-a0a5-c5110408c2b5')]

In [44]: dask.compute(tasks)
Out[44]: 
([{'FirstCategory': Empty DataFrame
   Columns: []
   Index: [],
   'SecondCategory': Empty DataFrame
   Columns: []
   Index: []},
  {'FirstCategory': Empty DataFrame
   Columns: []
   Index: [],
   'SecondCategory': Empty DataFrame
   Columns: []
   Index: []}],)