Query System

Kartothek views the whole cube as a large, virtual DataFrame. The seed dataset presents the groundtruth regarding rows, all other datasets are joined via a left join. The user should not see that data is partitioned via Partition Columns or split along datasets.

Important

It is a common misconception that Kartothek is able to join arbitrary datasets or implements a complete join system like SQL. This is NOT the case!

This section explain some technical details around this mechanism.

Per-Dataset Partitions

First of all, all partition files for all datasets are gathered. Every partition file is represented by a unique label. For every dataset, index data for the Primary Indices (aka partition columns) will be loaded and joined w/ the labels:

 P | Q | __ktk_cube_labels_seed
===+===+=========================
 0 | 0 | P=0/Q=0/<uuid1>.parquet
 0 | 1 | P=0/Q=1/<uuid2>.parquet
 0 | 1 | P=0/Q=1/<uuid3>.parquet
 1 | 0 | P=1/Q=0/<uuid4>.parquet
 1 | 0 | P=1/Q=1/<uuid5>.parquet

Also, pre-conditions are applied during that step. These are conditions that can be evaluated based on index data (Partition Indices, Explicit Secondary Indices for dimension columns as well as index columns):

condition = (I1 > 1)

join index information:

 P | Q | I1 | __ktk_cube_labels_seed
===+===+====+=========================
 0 | 0 |  1 | P=0/Q=0/<uuid1>.parquet
 0 | 1 |  2 | P=0/Q=1/<uuid2>.parquet
 0 | 1 |  3 | P=0/Q=1/<uuid3>.parquet
 1 | 0 |  4 | P=1/Q=0/<uuid4>.parquet
 1 | 0 |  5 | P=1/Q=1/<uuid5>.parquet

 filter:

 P | Q | I1 | __ktk_cube_labels_seed
===+===+====+=========================
 0 | 1 |  2 | P=0/Q=1/<uuid2>.parquet
 0 | 1 |  3 | P=0/Q=1/<uuid3>.parquet
 1 | 0 |  4 | P=1/Q=0/<uuid4>.parquet
 1 | 0 |  5 | P=1/Q=0/<uuid5>.parquet

 remove index information:

 P | Q | __ktk_cube_labels_seed
===+===+=========================
 0 | 1 | P=0/Q=1/<uuid2>.parquet
 0 | 1 | P=0/Q=1/<uuid3>.parquet
 1 | 0 | P=1/Q=0/<uuid4>.parquet
 1 | 0 | P=1/Q=1/<uuid5>.parquet

Now, partition-by data is added (if not already present):

partition-by = I2

 P | Q | I2 | __ktk_cube_labels_seed
===+===+====+=========================
 0 | 1 |  1 | P=0/Q=1/<uuid2>.parquet
 0 | 1 |  1 | P=0/Q=1/<uuid3>.parquet
 1 | 0 |  1 | P=1/Q=0/<uuid4>.parquet
 1 | 0 |  2 | P=1/Q=0/<uuid5>.parquet

Finally, rows w/ identical partition information (physical and partition-by) are compactified:

 P | Q | I2 | __ktk_cube_labels_seed
===+===+====+==================================================
 0 | 1 |  1 | P=0/Q=1/<uuid2>.parquet, P=0/Q=1/<uuid3>.parquet
 1 | 0 |  1 | P=1/Q=0/<uuid4>.parquet
 1 | 0 |  2 | P=1/Q=0/<uuid5>.parquet

Alignment

After data is prepared for every dataset, they are aligned using their physical partitions. Partitions that are present in non-seed datasets but are missing from the seed dataset are dropped:

inputs:

 P | Q | I2 | __ktk_cube_labels_seed
===+===+====+==================================================
 0 | 1 |  1 | P=0/Q=1/<uuid2>.parquet, P=0/Q=1/<uuid3>.parquet
 1 | 0 |  1 | P=1/Q=0/<uuid4>.parquet
 1 | 0 |  2 | P=1/Q=0/<uuid5>.parquet

 P | Q | __ktk_cube_labels_enrich
===+===+==================================================
 0 | 0 | P=0/Q=1/<uuid6>.parquet
 0 | 1 | P=0/Q=1/<uuid7>.parquet
 1 | 0 | P=1/Q=0/<uuid8>.parquet, P=0/Q=1/<uuid9>.parquet
 9 | 0 | P=9/Q=0/<uuid0>.parquet


 output:

 P | Q | I2 | __ktk_cube_labels_seed                               | __ktk_cube_labels_enrich
===+===+====+==================================================+==================================================
 0 | 1 |  1 | P=0/Q=1/<uuid2>.parquet, P=0/Q=1/<uuid3>.parquet | P=0/Q=1/<uuid7>.parquet
 1 | 0 |  1 | P=1/Q=0/<uuid4>.parquet                          | P=1/Q=0/<uuid8>.parquet, P=0/Q=1/<uuid9>.parquet
 1 | 0 |  2 | P=1/Q=0/<uuid5>.parquet                          | P=1/Q=0/<uuid8>.parquet, P=0/Q=1/<uuid9>.parquet

In case pre-conditions got applied to any non-seed dataset or partition-by columns that are neither a Partition Column nor Dimension Column, the resulting join will be an inner join. This may result in removing potential partitions early.

Re-grouping

Now, the DataFrame is grouped by partition-by:

partition-by: I2

group 1:

 P | Q | I2 | __ktk_cube_labels_seed                               | __ktk_cube_labels_enrich
===+===+====+==================================================+==================================================
 0 | 1 |  1 | P=0/Q=1/<uuid2>.parquet, P=0/Q=1/<uuid3>.parquet | P=0/Q=1/<uuid7>.parquet
 1 | 0 |  1 | P=1/Q=0/<uuid4>.parquet                          | P=1/Q=0/<uuid8>.parquet, P=0/Q=1/<uuid9>.parquet

group 2:

 P | Q | I2 | __ktk_cube_labels_seed                               | __ktk_cube_labels_enrich
===+===+====+==================================================+==================================================
 1 | 0 |  2 | P=1/Q=0/<uuid5>.parquet                          | P=1/Q=0/<uuid8>.parquet, P=0/Q=1/<uuid9>.parquet

Intra-Partition Joins

This section explains how DataFrames within a partition within a group are joined.

A simple explanation of the join logic would be: “The coordinates (cube cells) are taken from the seed dataset, all other information is add via a left join.”

Because the user is able to add conditions to the query and because we want to utilize predicate pushdown in a very efficient way, we define another term: restricted dataset. These are datasets which contain non-Dimension Column and non-Partition Column to which users wishes to apply restrictions (via conditions or via partition-by). Because these restrictions always need to apply, we can evaluate them pre-join and execute an inner join with the seed dataset.

Examples

The following sub-sections illustrate this system in multiple steps.

Example 1 (Join Semantics)

Here, a rather standard example is shown with explanations why data is kept or not:

columns   = [P, PRED]
condition = (OK == true) & (SCHED == true)

 Seed    | Conditions                | Enrichments
 db_data | data_checks | schedule    | predictions
=========+=============+=============+=============
 P=1     | P=1         | P=1         | P=1            <-- included, trivial case
         | OK=true     | SCHED=true  | PRED=0.23
---------+-------------+-------------+-------------
 P=2     | P=2         | P=2         | P=2            <-- excluded, because OK=false
         | OK=false    | SCHED=true  | PRED=0.12
---------+-------------+-------------+-------------
 P=3     | P=3         | P=3         | P=3            <-- excluded, because SCHED=false
         | OK=true     | SCHED=false | PRED=0.13
---------+-------------+-------------+-------------
         | P=4         | P=4         | P=4            <-- excluded, seed is missing
         | OK=true     | SCHED=true  | PRED=0.03          where does this data even come from?!
---------+-------------+-------------+-------------
 P=5     | P=5         | P=5         |                <-- included, even though PRED is missing
         | OK=true     | SCHED=true  |
---------+-------------+-------------+-------------
 P=6     | P=6         |             | P=6            <-- excluded, SCHED is missing
         | OK=true     |             | PRED=0.01

 ^         ^             ^             ^
 |         |             |             |
 +---------+-------------+             |
           |                           |
       inner join                      |
 tmp1 = db_data <-> data_checks on P   |
 out  = tmp1    <-> schedule    on P   |
 (but order actually doesn't matter)   |
           ^                           |
           |                           |
           +-----------------+---------+
                             |
                         left join
                             |
                             v

                           P | PRED
                          ===+======
                           1 | 0.23
                           5 | NaN

Example 2 (Outer Join)

Now, we have a P-L cube, with all datasets except of schedule having P-L dimensionality:

columns   = [P, L, PRED]
condition = (OK == true) & (SCHED == true)

 Seed    | Conditions                | Enrichments
 db_data | data_checks | schedule    | predictions
=========+=============+=============+=============
 P=1     | P=1         | P=1         | P=1            <-- included, trivial case
 L=1     | L=1         |             | L=1
         | OK=true     | SCHED=true  | PRED=0.23
---------+-------------+             +-------------
 P=1     | P=1         |             | P=1            <-- excluded, because OK=false
 L=2     | L=2         |             | L=2
         | OK=false    |             | PRED=0.12
---------+-------------+-------------+-------------
 P=2     | P=2         | P=2         | P=2            <-+ excluded, because SCHED=false
 L=1     | L=1         |             | L=1              |
         | OK=true     | SCHED=false | PRED=0.13        |
---------+-------------+             +-------------     |
 P=2     | P=2         |             | P=2            <-+
 L=2     | L=2         |             | L=2
         | OK=true     |             | PRED=0.13

 ^         ^             ^             ^
 |         |             |             |
 +---------+-------------+             |
           |                           |
       inner join                      |
 tmp1 = db_data <-> data_checks on P,L |
 out  = tmp1    <-> schedule    on P   |
 (but order actually doesn't matter)   |
           ^                           |
           |                           |
           +-----------------+---------+
                             |
                         left join
                             |
                             v

                         P | L | PRED
                        ===+===+======
                         1 | 1 | 0.23

Example 3 (Projection)

This shows how the seed dataset can be used to also produce sub-dimensional / projected results:

columns   = [P, AVG]
condition = (SCHED == true)

 Seed    | Conditions  | Enrichments
 db_data | schedule    | agg
=========+=============+=============
 P=1     | P=1         | P=1            <-- included, trivial case
 L=?     |             |
         | SCHED=true  | AVG=10.2
---------+-------------+-------------
 P=2     | P=2         | P=2            <-- excluded, because SCHED=false
 L=?     |             |
         | SCHED=false | AVG=1.34

 ^         ^             ^
 |         |             |
 |         +---+         |
 |             |         |
 project to P  |         |
 |             |         |
 +---------+---+         |
           |             +---------+
       inner join                  |
 out = db_data <-> schedule on P   |
           ^                       |
           |                       |
           +-----------------+-----+
                             |
                         left join
                             |
                             v

                         P |  AVG
                        ===+=======
                         1 |  10.2

Final Concat

After DataFrames for all partitions in a group are joined, they are concatenated in order of Partition Columns.