Examples¶
Table of Contents
This is a quick walk through the basic functionality of Kartothek Cubes.
First, we want to create a cube for geodata:
>>> from kartothek.api.cube import Cube
>>> cube = Cube(
... uuid_prefix="geodata",
... dimension_columns=["city", "day"],
... partition_columns=["country"],
... )
Apart from an abstract cube definition, we need a simplekv-based storage backend:
>>> from functools import partial
>>> import tempfile
>>> import storefact
>>> store_location = tempfile.mkdtemp()
>>> store_factory = partial(
... storefact.get_store_from_url,
... "hfs://" + store_location,
... )
>>> store = store_factory()
Some generic setups of libraries:
>>> import pandas as pd
>>> pd.set_option("display.max_rows", 40)
>>> pd.set_option("display.width", None)
>>> pd.set_option('display.max_columns', None)
>>> pd.set_option('display.expand_frame_repr', False)
Build¶
Kartothek cube should be initially filled with the following information:
>>> from io import StringIO
>>> import pandas as pd
>>> df_weather = pd.read_csv(
... filepath_or_buffer=StringIO("""
... avg_temp city country day
... 6 Hamburg DE 2018-01-01
... 5 Hamburg DE 2018-01-02
... 8 Dresden DE 2018-01-01
... 4 Dresden DE 2018-01-02
... 6 London UK 2018-01-01
... 8 London UK 2018-01-02
... """.strip()),
... delim_whitespace=True,
... parse_dates=["day"],
... )
We use the simple kartothek.io.eager_cube
backend to store the data:
>>> from kartothek.api.cube import build_cube
>>> datasets_build = build_cube(
... data=df_weather,
... store=store,
... cube=cube,
... )
We just have preserved a single Kartothek dataset:
>>> print(", ".join(sorted(datasets_build.keys())))
seed
>>> ds_seed = datasets_build["seed"].load_all_indices(store)
>>> print(ds_seed.uuid)
geodata++seed
>>> print(", ".join(sorted(ds_seed.indices)))
city, country, day
Finally, let’s have a quick look at the store content. Note that we cut out UUIDs and timestamps here for brevity.
>>> import re
>>> def print_filetree(s, prefix=""):
... entries = []
... for k in sorted(s.keys(prefix)):
... k = re.sub("[a-z0-9]{32}", "<uuid>", k)
... k = re.sub("[0-9]{4}-[0-9]{2}-[0-9]{2}((%20)|(T))[0-9]{2}%3A[0-9]{2}%3A[0-9]+.[0-9]{6}", "<ts>", k)
... entries.append(k)
... print("\n".join(sorted(entries)))
>>> print_filetree(store)
geodata++seed.by-dataset-metadata.json
geodata++seed/indices/city/<ts>.by-dataset-index.parquet
geodata++seed/indices/day/<ts>.by-dataset-index.parquet
geodata++seed/table/_common_metadata
geodata++seed/table/country=DE/<uuid>.parquet
geodata++seed/table/country=UK/<uuid>.parquet
Extend¶
Now let’s say we would also like to have longitude and latitude data in our cube.
>>> from kartothek.api.cube import extend_cube
>>> df_location = pd.read_csv(
... filepath_or_buffer=StringIO("""
... city country latitude longitude
... Hamburg DE 53.551086 9.993682
... Dresden DE 51.050407 13.737262
... London UK 51.509865 -0.118092
... Tokyo JP 35.652832 139.839478
... """.strip()),
... delim_whitespace=True,
... )
Hint
Obviously, this data does not change over time. As long as the data spans at least a single dimensions and describes all partition columns, you are free to use projected data for non-seed datasets.
>>> datasets_extend = extend_cube(
... data={"latlong": df_location},
... store=store,
... cube=cube,
... )
This results in an extra dataset:
>>> print(", ".join(sorted(datasets_extend.keys())))
latlong
>>> ds_latlong = datasets_extend["latlong"].load_all_indices(store)
>>> print(ds_latlong.uuid)
geodata++latlong
>>> print(", ".join(sorted(ds_latlong.indices)))
country
Note that for the second dataset, no indices for 'city'
and 'day'
exists. These are only created for the seed
dataset, since that datasets forms the groundtruth about which city-day entries are part of the cube.
Hint
Since the seed dataset forms the groundtruth regarding cells in the cube, additional data in other datasets are
ignored. So in this case, 'Tokyo'
will be store to the cube but will cut out during queries.
If you look at the file tree, you can see that the second dataset is completely separated. This is useful to copy/backup parts of the cube:
>>> print_filetree(store)
geodata++latlong.by-dataset-metadata.json
geodata++latlong/table/_common_metadata
geodata++latlong/table/country=DE/<uuid>.parquet
geodata++latlong/table/country=JP/<uuid>.parquet
geodata++latlong/table/country=UK/<uuid>.parquet
geodata++seed.by-dataset-metadata.json
geodata++seed/indices/city/<ts>.by-dataset-index.parquet
geodata++seed/indices/day/<ts>.by-dataset-index.parquet
geodata++seed/table/_common_metadata
geodata++seed/table/country=DE/<uuid>.parquet
geodata++seed/table/country=UK/<uuid>.parquet
Query¶
Now the whole beauty of Kartothek Cube does not come from storing multiple datasets, but especially from retrieving the data in a very comfortable way. It is possible to treat the entire cube as a single, large DataFrame:
>>> from kartothek.api.cube import query_cube
>>> query_cube(
... cube=cube,
... store=store,
... )[0]
avg_temp city country day latitude longitude
0 8 Dresden DE 2018-01-01 51.050407 13.737262
1 4 Dresden DE 2018-01-02 51.050407 13.737262
2 6 Hamburg DE 2018-01-01 53.551086 9.993682
3 5 Hamburg DE 2018-01-02 53.551086 9.993682
4 6 London UK 2018-01-01 51.509865 -0.118092
5 8 London UK 2018-01-02 51.509865 -0.118092
As you can see, we get a list of results back. This is because Kartothek Cube naturally supports partition-by semantic, which is more helpful for distributed backends like Distributed:
>>> dfs = query_cube(
... cube=cube,
... store=store,
... partition_by="country",
... )
>>> dfs[0]
avg_temp city country day latitude longitude
0 8 Dresden DE 2018-01-01 51.050407 13.737262
1 4 Dresden DE 2018-01-02 51.050407 13.737262
2 6 Hamburg DE 2018-01-01 53.551086 9.993682
3 5 Hamburg DE 2018-01-02 53.551086 9.993682
>>> dfs[1]
avg_temp city country day latitude longitude
0 6 London UK 2018-01-01 51.509865 -0.118092
1 8 London UK 2018-01-02 51.509865 -0.118092
The query system also supports selection and projection:
>>> from kartothek.api.cube import C
>>> query_cube(
... cube=cube,
... store=store,
... payload_columns=["avg_temp"],
... conditions=(
... (C("country") == "DE") &
... C("latitude").in_interval(50., 52.) &
... C("longitude").in_interval(13., 14.)
... ),
... )[0]
avg_temp city country day
0 8 Dresden DE 2018-01-01
1 4 Dresden DE 2018-01-02
Transform¶
Query and Extend can be combined to build powerful transformation pipelines. To better illustrate this we will use Dask.Bag for that example.
Important
Since Dask operations can also be executed in subprocesses, multiple threads, or even on other machines using Distributed, Kartothek Cube requires the user to pass a Store Factory instead of a store. This ensures that no file handles, TCP connections, or other non-transportable objects are shared.
>>> from kartothek.api.cube import (
... extend_cube_from_bag,
... query_cube_bag,
... )
>>> def transform(df):
... df["avg_temp_country_min"] = df["avg_temp"].min()
... return {
... "transformed": df.loc[
... :,
... [
... "avg_temp_country_min",
... "city",
... "country",
... "day",
... ]
... ],
... }
>>> transformed = query_cube_bag(
... cube=cube,
... store=store_factory,
... partition_by="day",
... ).map(transform)
>>> datasets_transformed = extend_cube_from_bag(
... data=transformed,
... store=store_factory,
... cube=cube,
... ktk_cube_dataset_ids=["transformed"],
... ).compute()
>>> query_cube(
... cube=cube,
... store=store,
... payload_columns=[
... "avg_temp",
... "avg_temp_country_min",
... ],
... )[0]
avg_temp avg_temp_country_min city country day
0 8 6 Dresden DE 2018-01-01
1 4 4 Dresden DE 2018-01-02
2 6 6 Hamburg DE 2018-01-01
3 5 4 Hamburg DE 2018-01-02
4 6 6 London UK 2018-01-01
5 8 4 London UK 2018-01-02
Notice that the partition_by
argument does not have to match the cube Partition Columns to work. You may use
any indexed column. Keep in mind that fine-grained partitioning can have drawbacks though, namely large scheduling
overhead and many blob files which can make reading the data inefficient:
>>> print_filetree(store, "geodata++transformed")
geodata++transformed.by-dataset-metadata.json
geodata++transformed/table/_common_metadata
geodata++transformed/table/country=DE/<uuid>.parquet
geodata++transformed/table/country=DE/<uuid>.parquet
geodata++transformed/table/country=UK/<uuid>.parquet
geodata++transformed/table/country=UK/<uuid>.parquet
Append¶
New rows can be added to the cube using an append operation:
>>> from kartothek.api.cube import append_to_cube
>>> df_weather2 = pd.read_csv(
... filepath_or_buffer=StringIO("""
... avg_temp city country day
... 20 Santiago CL 2018-01-01
... 22 Santiago CL 2018-01-02
... """.strip()),
... delim_whitespace=True,
... parse_dates=["day"],
... )
>>> datasets_appended = append_to_cube(
... data=df_weather2,
... store=store,
... cube=cube,
... )
>>> print_filetree(store, "geodata++seed")
geodata++seed.by-dataset-metadata.json
geodata++seed/indices/city/<ts>.by-dataset-index.parquet
geodata++seed/indices/city/<ts>.by-dataset-index.parquet
geodata++seed/indices/day/<ts>.by-dataset-index.parquet
geodata++seed/indices/day/<ts>.by-dataset-index.parquet
geodata++seed/table/_common_metadata
geodata++seed/table/country=CL/<uuid>.parquet
geodata++seed/table/country=DE/<uuid>.parquet
geodata++seed/table/country=UK/<uuid>.parquet
Notice that the indices where updated automatically.
>>> query_cube(
... cube=cube,
... store=store,
... )[0]
avg_temp avg_temp_country_min city country day latitude longitude
0 8 6.0 Dresden DE 2018-01-01 51.050407 13.737262
1 4 4.0 Dresden DE 2018-01-02 51.050407 13.737262
2 6 6.0 Hamburg DE 2018-01-01 53.551086 9.993682
3 5 4.0 Hamburg DE 2018-01-02 53.551086 9.993682
4 6 6.0 London UK 2018-01-01 51.509865 -0.118092
5 8 4.0 London UK 2018-01-02 51.509865 -0.118092
6 20 NaN Santiago CL 2018-01-01 NaN NaN
7 22 NaN Santiago CL 2018-01-02 NaN NaN
Remove¶
You can remove entire partitions from the cube using the remove operation:
>>> from kartothek.api.cube import remove_partitions
>>> datasets_after_removal = remove_partitions(
... cube=cube,
... store=store,
... ktk_cube_dataset_ids=["latlong"],
... conditions=(C("country") == "UK"),
... )
>>> query_cube(
... cube=cube,
... store=store,
... )[0]
avg_temp avg_temp_country_min city country day latitude longitude
0 8 6.0 Dresden DE 2018-01-01 51.050407 13.737262
1 4 4.0 Dresden DE 2018-01-02 51.050407 13.737262
2 6 6.0 Hamburg DE 2018-01-01 53.551086 9.993682
3 5 4.0 Hamburg DE 2018-01-02 53.551086 9.993682
4 6 6.0 London UK 2018-01-01 NaN NaN
5 8 4.0 London UK 2018-01-02 NaN NaN
6 20 NaN Santiago CL 2018-01-01 NaN NaN
7 22 NaN Santiago CL 2018-01-02 NaN NaN
Delete¶
You can also delete entire datasets (or the entire cube):
>>> from kartothek.api.cube import delete_cube
>>> datasets_still_in_cube = delete_cube(
... cube=cube,
... store=store,
... datasets=["transformed"],
... )
>>> query_cube(
... cube=cube,
... store=store,
... )[0]
avg_temp city country day latitude longitude
0 8 Dresden DE 2018-01-01 51.050407 13.737262
1 4 Dresden DE 2018-01-02 51.050407 13.737262
2 6 Hamburg DE 2018-01-01 53.551086 9.993682
3 5 Hamburg DE 2018-01-02 53.551086 9.993682
4 6 London UK 2018-01-01 NaN NaN
5 8 London UK 2018-01-02 NaN NaN
6 20 Santiago CL 2018-01-01 NaN NaN
7 22 Santiago CL 2018-01-02 NaN NaN
Dimensionality and Partitioning¶
Sometimes, you have data that only exists in a projection of the cube, like the latlong
data from the Extend
section. For non-seed datasets, you can just leave out Dimension Columns, as long as at least a single
Dimension Column remains.
Sometimes, you may find that the standard partitioning does not match the data really well, so for non-seed datasets, you can change the partitioning:
leave out partition columns: especially helpful when the dataset is really small or data only exists on a specific projection that does lead to partitioning (e.g. the
day
dimension from the example cube)additional partition columns: when the dataset has many and/or very memory-intense columns
Important
Although other partitionings than the cube Partition Columns can be specified, it is strongly adviced to not diverge too much from these for performance reasons.
>>> df_time = pd.DataFrame({
... "day": pd.date_range(
... start="2018-01-01",
... end="2019-01-01",
... freq="D",
... ),
... })
>>> df_time["weekday"] = df_time.day.dt.weekday
>>> df_time["month"] = df_time.day.dt.month
>>> df_time["year"] = df_time.day.dt.year
>>> datasets_time = extend_cube(
... data={"time": df_time},
... store=store,
... cube=cube,
... partition_on={"time": []},
... )
>>> print_filetree(store, "geodata++time")
geodata++time.by-dataset-metadata.json
geodata++time/table/<uuid>.parquet
geodata++time/table/_common_metadata
>>> query_cube(
... cube=cube,
... store=store,
... )[0]
avg_temp city country day latitude longitude month weekday year
0 8 Dresden DE 2018-01-01 51.050407 13.737262 1 0 2018
1 4 Dresden DE 2018-01-02 51.050407 13.737262 1 1 2018
2 6 Hamburg DE 2018-01-01 53.551086 9.993682 1 0 2018
3 5 Hamburg DE 2018-01-02 53.551086 9.993682 1 1 2018
4 6 London UK 2018-01-01 NaN NaN 1 0 2018
5 8 London UK 2018-01-02 NaN NaN 1 1 2018
6 20 Santiago CL 2018-01-01 NaN NaN 1 0 2018
7 22 Santiago CL 2018-01-02 NaN NaN 1 1 2018