Mutating Datasets

It’s possible to update existing data by adding new physical partitions to them and deleting or replacing old partitions. Kartothek provides update functions that generally have the prefix update_dataset in their names. For example, update_dataset_from_dataframes() is the update function for the eager backend.

To see updating in action, let’s first set up a storage location and store some data there with Kartothek.

In [1]: dm = store_dataframes_as_dataset(
   ...:     store=store_url, dataset_uuid="partitioned_dataset", dfs=[df], partition_on="B"
   ...: )
   ...: 

In [2]: sorted(dm.partitions.keys())
Out[2]: 
['B=2013-01-02%2000%3A00%3A00/f51dbac93e52407ba1af37e60e1c2f0a',
 'B=2013-01-03%2000%3A00%3A00/f51dbac93e52407ba1af37e60e1c2f0a']

Appending Data

Now, we create another_df with the same schema as our intial dataframe df and update it using the eager backend by calling update_dataset_from_dataframes():

In [3]: from kartothek.api.dataset import update_dataset_from_dataframes

In [4]: another_df = pd.DataFrame(
   ...:     {
   ...:         "A": 5.0,
   ...:         "B": [
   ...:             pd.Timestamp("20110103"),
   ...:             pd.Timestamp("20110103"),
   ...:             pd.Timestamp("20110104"),
   ...:             pd.Timestamp("20110104"),
   ...:         ],
   ...:         "C": pd.Series(2, index=list(range(4)), dtype="float32"),
   ...:         "D": np.array([6] * 4, dtype="int32"),
   ...:         "E": pd.Categorical(["prod", "dev", "prod", "dev"]),
   ...:         "F": "bar",
   ...:     }
   ...: )
   ...: 

In [5]: dm = update_dataset_from_dataframes([another_df], store=store_url, dataset_uuid=dm.uuid)

In [6]: sorted(dm.partitions.keys())
Out[6]: 
['B=2011-01-03%2000%3A00%3A00/1cdc01bc63b54faf8284709574c001a7',
 'B=2011-01-04%2000%3A00%3A00/1cdc01bc63b54faf8284709574c001a7',
 'B=2013-01-02%2000%3A00%3A00/f51dbac93e52407ba1af37e60e1c2f0a',
 'B=2013-01-03%2000%3A00%3A00/f51dbac93e52407ba1af37e60e1c2f0a']

Looking at dm.partitions, we can see that another partition has been added.

If we read the data again, we can see that the another_df has been appended to the previous contents.

In [7]: from kartothek.api.dataset import read_table

In [8]: updated_df = read_table(dataset_uuid=dm.uuid, store=store_url, table="table")

In [9]: updated_df
Out[9]: 
           B    A    C  D      E    F
0 2013-01-02  1.0  1.0  3   test  foo
1 2013-01-02  1.0  1.0  3  train  foo
2 2013-01-03  1.0  1.0  3   test  foo
3 2013-01-03  1.0  1.0  3  train  foo
4 2011-01-03  5.0  2.0  6   prod  bar
5 2011-01-03  5.0  2.0  6    dev  bar
6 2011-01-04  5.0  2.0  6   prod  bar
7 2011-01-04  5.0  2.0  6    dev  bar

The way dataset updates work is that new partitions are added to a dataset as long as they have the same tables as the existing partitions. A different table cannot be introduced into an existing dataset with an update.

To illustrate this point better, let’s first create a dataset with two tables:

In [10]: df2 = pd.DataFrame(
   ....:     {
   ....:         "G": "foo",
   ....:         "H": pd.Categorical(["test", "train", "test", "train"]),
   ....:         "I": np.array([9] * 4, dtype="int32"),
   ....:         "J": pd.Series(3, index=list(range(4)), dtype="float32"),
   ....:         "K": pd.Timestamp("20190604"),
   ....:         "L": 2.0,
   ....:     }
   ....: )
   ....: 

In [11]: df2
Out[11]: 
     G      H  I    J          K    L
0  foo   test  9  3.0 2019-06-04  2.0
1  foo  train  9  3.0 2019-06-04  2.0
2  foo   test  9  3.0 2019-06-04  2.0
3  foo  train  9  3.0 2019-06-04  2.0

In [12]: dm_two_tables = store_dataframes_as_dataset(
   ....:     store_url, "two_tables", dfs=[{"data": {"table1": df, "table2": df2}}]
   ....: )
   ....: 

In [13]: dm_two_tables.tables
Out[13]: ['table1', 'table2']

In [14]: sorted(dm_two_tables.partitions.keys())
Out[14]: ['af69739f7af042f9956b968dddfa3c53']

Partition identifiers

In the previous example a dictionary was used to pass the desired data to the store function. To label each partition, by default Kartothek uses UUIDs to ensure that each partition is named uniquely. This is necessary so that the update can properly work using copy-on-write principles.

Below is an example where we update the existing dataset another_unique_dataset_identifier with new data for table1 and table2:

In [15]: another_df2 = pd.DataFrame(
   ....:     {
   ....:         "G": "bar",
   ....:         "H": pd.Categorical(["prod", "dev", "prod", "dev"]),
   ....:         "I": np.array([12] * 4, dtype="int32"),
   ....:         "J": pd.Series(4, index=list(range(4)), dtype="float32"),
   ....:         "K": pd.Timestamp("20190614"),
   ....:         "L": 10.0,
   ....:     }
   ....: )
   ....: 

In [16]: another_df2
Out[16]: 
     G     H   I    J          K     L
0  bar  prod  12  4.0 2019-06-14  10.0
1  bar   dev  12  4.0 2019-06-14  10.0
2  bar  prod  12  4.0 2019-06-14  10.0
3  bar   dev  12  4.0 2019-06-14  10.0

In [17]: dm_two_tables = update_dataset_from_dataframes(
   ....:     {"data": {"table1": another_df, "table2": another_df2}},
   ....:     store=store_url,
   ....:     dataset_uuid=dm_two_tables.uuid,
   ....: )
   ....: 

In [18]: dm_two_tables.tables
Out[18]: ['table1', 'table2']

In [19]: sorted(dm_two_tables.partitions.keys())
Out[19]: ['2154ca4c61684914849d53d954624ac4', 'af69739f7af042f9956b968dddfa3c53']

Trying to update only a subset of tables throws a ValueError:

In [20]: update_dataset_from_dataframes(
   ....:        {
   ....:           "data":
   ....:           {
   ....:              "table2": another_df2
   ....:           }
   ....:        },
   ....:        store=store_url,
   ....:        dataset_uuid=dm_two_tables.uuid
   ....:        )
   ....: 
---------------------------------------------------------------------------
ValueError: Input partitions for update have different tables than dataset:
Input partition tables: {'table2'}
Tables of existing dataset: ['table1', 'table2']

Deleting Data

Adding data to an existing dataset is not the only functionality achievable within an update operation, and it can also be used to remove data. To do so we use the delete_scope keyword argument as shown in the example below:

In [21]: dm = update_dataset_from_dataframes(
   ....:     None,
   ....:     store=store_url,
   ....:     dataset_uuid=dm.uuid,
   ....:     partition_on="B",
   ....:     delete_scope=[{"B": pd.Timestamp("20130102")}],
   ....: )
   ....: 

In [22]: sorted(dm.partitions.keys())
Out[22]: 
['B=2011-01-03%2000%3A00%3A00/1cdc01bc63b54faf8284709574c001a7',
 'B=2011-01-04%2000%3A00%3A00/1cdc01bc63b54faf8284709574c001a7',
 'B=2013-01-03%2000%3A00%3A00/f51dbac93e52407ba1af37e60e1c2f0a']

As we can see, we specified using a dictionary that data where the column B has the value pd.Timestamp("20130102") should be removed. Looking at the partitions after the update, we see that the partition B=2013-01-02[...] has in fact been removed.

Warning

We defined delete_scope over a value of B, which is the column that we partitioned on: delete_scope only works on partitioned columns.

Thus, delete_scope should only be used on partitioned columns due to their one-to-one mapping; without the guarantee of one-to-one mappings, using delete_scope could have unwanted effects like accidentally removing data with different values.

Attempting to use delete_scope will also work on datasets not previously partitioned on any column(s); however this is not at all advised since the effect will simply be to remove all previous partitions and replace them with the ones in the update.

If the intention of the user is to delete the entire dataset, using kartothek.io.eager.delete_dataset() would be a much better, cleaner and safer way to go about doing so.

When using delete_scope, multiple values for the same column cannot be defined as a list but have to be specified instead as individual dictionaries, i.e. [{"E": ["test", "train"]}] will not work but [{"E": "test"}, {"E": "train"}] will.

In [23]: duplicate_df = df.copy()

In [24]: duplicate_df.F = "bar"

In [25]: dm = store_dataframes_as_dataset(
   ....:     store_url,
   ....:     "another_partitioned_dataset",
   ....:     [df, duplicate_df],
   ....:     partition_on=["E", "F"],
   ....: )
   ....: 

In [26]: sorted(dm.partitions.keys())
Out[26]: 
['E=test/F=bar/f78f46346b71429c8990f2da7f569e2f',
 'E=test/F=foo/389e32471ca7494ebe0fe5e2584d9b35',
 'E=train/F=bar/f78f46346b71429c8990f2da7f569e2f',
 'E=train/F=foo/389e32471ca7494ebe0fe5e2584d9b35']
In [27]: dm = update_dataset_from_dataframes(
   ....:     None,
   ....:     store=store_url,
   ....:     dataset_uuid=dm.uuid,
   ....:     partition_on=["E", "F"],
   ....:     delete_scope=[{"E": "train", "F": "foo"}, {"E": "test", "F": "bar"}],
   ....: )
   ....: 

In [28]: sorted(dm.partitions.keys())  # `E=train/F=foo` and `E=test/F=bar` are deleted
Out[28]: 
['E=test/F=foo/389e32471ca7494ebe0fe5e2584d9b35',
 'E=train/F=bar/f78f46346b71429c8990f2da7f569e2f']

Replacing Data

Finally, an update step can be used to perform the two steps above, i.e. deleting and appending together in an atomic operation. This is done simply by specifying a dataset to be appended while also defining a delete_scope over the partition. The following example illustrates how both can be performed with one update:

In [29]: df  # Column B includes 2 values for '2013-01-02' and another 2 for '2013-01-03'
Out[29]: 
     A          B    C  D      E    F
0  1.0 2013-01-02  1.0  3   test  foo
1  1.0 2013-01-02  1.0  3  train  foo
2  1.0 2013-01-03  1.0  3   test  foo
3  1.0 2013-01-03  1.0  3  train  foo

In [30]: dm = store_dataframes_as_dataset(store_url, "replace_partition", [df], partition_on="B")

In [31]: sorted(dm.partitions.keys())  # two partitions, one for each value of `B`
Out[31]: 
['B=2013-01-02%2000%3A00%3A00/95601d75085748e784b68d6a25733f14',
 'B=2013-01-03%2000%3A00%3A00/95601d75085748e784b68d6a25733f14']

In [32]: modified_df = another_df.copy()

# set column E to have value 'train' for all rows in this dataframe
In [33]: modified_df.B = pd.Timestamp("20130103")

In [34]: dm = update_dataset_from_dataframes(
   ....:     [
   ....:         modified_df
   ....:     ],  # specify dataframe which has 'new' data for partition to be replaced
   ....:     store=store_url,
   ....:     dataset_uuid=dm.uuid,
   ....:     partition_on="B",  # don't forget to specify the partitioning column
   ....:     delete_scope=[
   ....:         {"B": pd.Timestamp("2013-01-03")}
   ....:     ],  # specify the partition to be deleted
   ....: )
   ....: 

In [35]: sorted(dm.partitions.keys())
Out[35]: 
['B=2013-01-02%2000%3A00%3A00/95601d75085748e784b68d6a25733f14',
 'B=2013-01-03%2000%3A00%3A00/ecfe6be2f03149c38ca6ba7f85db748e']

In [36]: read_table(dm.uuid, store_url, table="table")
Out[36]: 
           B    A    C  D      E    F
0 2013-01-02  1.0  1.0  3   test  foo
1 2013-01-02  1.0  1.0  3  train  foo
2 2013-01-03  5.0  2.0  6   prod  bar
3 2013-01-03  5.0  2.0  6    dev  bar
4 2013-01-03  5.0  2.0  6   prod  bar
5 2013-01-03  5.0  2.0  6    dev  bar

As can be seen in the example above, the resultant dataframe from read_table() consists of two rows corresponding to B=2013-01-02 (from df) and four rows corresponding to B=2013-01-03 from modified_df. Thus, the original partition with the two rows corresponding to B=2013-01-03 from df has been completely replaced.

Garbage collection

When Kartothek is executing an operation, it makes sure to not commit changes to the dataset until the operation has been succesfully completed. If a write operation does not succeed for any reason, although there may be new files written to storage, those files will not be used by the dataset as they will not be referenced in the Kartothek metadata. Thus, when the user reads the dataset, no new data will appear in the output.

Similarly, when deleting a partition, Kartothek only removes the reference of that file from the metadata.

These temporary files will remain in storage until a Kartothek garbage collection function is called on the dataset. If a dataset is updated on a regular basis, it may be useful to run garbage collection periodically to decrease unnecessary storage use.

An example of garbage collection is shown below. A little above, near the end of the delete section, we removed two partitions for the dataset with uuid replace_partition. The removed files remain in storage but are untracked by Kartothek. When garbage collection is called, the files are removed.

In [37]: from kartothek.api.dataset import garbage_collect_dataset

In [38]: from storefact import get_store_from_url

In [39]: store = get_store_from_url(store_url)

In [40]: files_before = set(store.keys())

In [41]: garbage_collect_dataset(store=store, dataset_uuid=dm.uuid)

In [42]: files_before.difference(store.keys())  # Show files removed
Out[42]: {'replace_partition/table/B=2013-01-03%2000%3A00%3A00/95601d75085748e784b68d6a25733f14.parquet'}

Mutating indexed datasets

The mutating operation will update all indices that currently exist for the dataset. This even holds true in case the update function does not specify any or only partially the indices. Consider the following example

In [43]: df = pd.DataFrame({"payload": range(10), "i1": 0, "i2": ["a"] * 5 + ["b"] * 5})

In [44]: dm = store_dataframes_as_dataset(
   ....:     store_url, "indexed_dataset", [df], secondary_indices=["i1", "i2"]
   ....: )
   ....: 

In [45]: dm = dm.load_all_indices(store_url)

In [46]: dm.indices["i1"].observed_values()
Out[46]: array([0])

In [47]: dm.indices["i2"].observed_values()
Out[47]: array(['b', 'a'], dtype=object)

In [48]: new_df = pd.DataFrame({"payload": range(10), "i1": 1, "i2": "c"})

If we do not specify anything, kartothek will infer the indices and update them correctly

In [49]: dm = update_dataset_from_dataframes([new_df], store=store_url, dataset_uuid=dm.uuid)

In [50]: dm = dm.load_all_indices(store_url)

In [51]: dm.indices["i1"].observed_values()
Out[51]: array([0, 1])

In [52]: dm.indices["i2"].observed_values()
Out[52]: array(['b', 'a', 'c'], dtype=object)

This is even true if only a subset is given

In [53]: new_df = pd.DataFrame({"payload": range(10), "i1": 2, "i2": "d"})

In [54]: dm = update_dataset_from_dataframes(
   ....:     [new_df], store=store_url, dataset_uuid=dm.uuid, secondary_indices="i1"
   ....: )
   ....: 

In [55]: dm = dm.load_all_indices(store_url)

In [56]: dm.indices["i1"].observed_values()
Out[56]: array([0, 1, 2])

In [57]: dm.indices["i2"].observed_values()
Out[57]: array(['b', 'a', 'c', 'd'], dtype=object)