Source code for kartothek.io.testing.delete_cube

# -*- coding: utf-8 -*-
import pandas as pd
import pytest

from kartothek.api.discover import discover_datasets_unchecked
from kartothek.core.cube.cube import Cube
from kartothek.io.eager_cube import build_cube
from kartothek.utils.ktk_adapters import get_dataset_keys

__all__ = (
    "test_delete_twice",
    "test_fail_blocksize_negative",
    "test_fail_blocksize_wrong_type",
    "test_fail_blocksize_zero",
    "test_fail_no_store_factory",
    "test_keep_garbage_due_to_no_listing",
    "test_keep_other",
    "test_partial_delete",
    "test_simple",
)


[docs]def test_simple(driver, function_store): df_seed = pd.DataFrame( {"x": [0, 1, 2, 3], "p": [0, 0, 1, 1], "v1": [10, 11, 12, 13]} ) df_enrich = pd.DataFrame( {"x": [0, 1, 2, 3], "p": [0, 0, 1, 1], "v2": [10, 11, 12, 13]} ) cube = Cube(dimension_columns=["x"], partition_columns=["p"], uuid_prefix="cube") build_cube( data={cube.seed_dataset: df_seed, "enrich": df_enrich}, cube=cube, store=function_store, ) driver(cube=cube, store=function_store) assert set(function_store().keys()) == set()
[docs]def test_keep_other(driver, function_store): df = pd.DataFrame({"x": [0, 1, 2, 3], "p": [0, 0, 1, 1], "v": [10, 11, 12, 13]}) cube1 = Cube(dimension_columns=["x"], partition_columns=["p"], uuid_prefix="cube1") cube2 = cube1.copy(uuid_prefix="cube2") build_cube(data=df, cube=cube1, store=function_store) keys = set(function_store().keys()) build_cube(data=df, cube=cube2, store=function_store) driver(cube=cube2, store=function_store) assert set(function_store().keys()) == keys
[docs]def test_keep_garbage_due_to_no_listing(driver, function_store): df1 = pd.DataFrame({"x": [0, 1, 2, 3], "p": [0, 0, 1, 1], "v": [10, 11, 12, 13]}) df2 = pd.DataFrame({"x": [0, 1, 2, 3], "p": [2, 2, 3, 3], "v": [10, 11, 12, 13]}) cube = Cube(dimension_columns=["x"], partition_columns=["p"], uuid_prefix="cube") # test build DF1 to see which keys are created build_cube(data=df1, cube=cube, store=function_store) keys1 = set(function_store().keys()) # wipe for k in list(function_store().keys()): function_store().delete(k) # test build DF2 to see which keys are created build_cube(data=df2, cube=cube, store=function_store) keys2 = set(function_store().keys()) # wipe again for k in list(function_store().keys()): function_store().delete(k) # some keys are obviosly present everytime (like central metadata and # common metadata) keys_common = keys1 & keys2 # build DF1 and overwrite w/ DF2 build_cube(data=df1, cube=cube, store=function_store) keys3 = set(function_store().keys()) build_cube(data=df2, cube=cube, store=function_store, overwrite=True) # now some keys if DF1 must be leftovers/gargabe that cannot be deleted w/o listing the entire store (which would # be too expensive) gargabe = keys3 - keys_common assert len(gargabe) > 0 driver(cube=cube, store=function_store) assert set(function_store().keys()) == gargabe
[docs]def test_delete_twice(driver, function_store): df = pd.DataFrame({"x": [0, 1, 2, 3], "p": [0, 0, 1, 1], "v": [10, 11, 12, 13]}) cube = Cube(dimension_columns=["x"], partition_columns=["p"], uuid_prefix="cube") build_cube(data=df, cube=cube, store=function_store) driver(cube=cube, store=function_store) driver(cube=cube, store=function_store) assert set(function_store().keys()) == set()
[docs]def test_partial_delete(driver, function_store): df_seed = pd.DataFrame( {"x": [0, 1, 2, 3], "p": [0, 0, 1, 1], "v": [10, 11, 12, 13]} ) df_1 = pd.DataFrame({"x": [0, 1, 2, 3], "p": [0, 0, 1, 1], "a": [20, 21, 22, 23]}) df_2 = pd.DataFrame({"x": [0, 1, 2, 3], "p": [0, 0, 1, 1], "b": [20, 21, 22, 23]}) cube = Cube(dimension_columns=["x"], partition_columns=["p"], uuid_prefix="cube") datasets = build_cube( data={cube.seed_dataset: df_seed, "enrich-1": df_1, "enrich-2": df_2}, cube=cube, store=function_store, ) enrich_1_keys = get_dataset_keys( discover_datasets_unchecked( uuid_prefix=cube.uuid_prefix, store=function_store, filter_ktk_cube_dataset_ids=["enrich-1"], )["enrich-1"] ) enrich_2_keys = get_dataset_keys( discover_datasets_unchecked( uuid_prefix=cube.uuid_prefix, store=function_store, filter_ktk_cube_dataset_ids=["enrich-2"], )["enrich-2"] ) all_keys = set(function_store().keys()) driver(cube=cube, store=function_store, datasets=["enrich-1"]) assert set(function_store().keys()) == all_keys - enrich_1_keys driver(cube=cube, store=function_store, datasets={"enrich-2": datasets["enrich-2"]}) assert set(function_store().keys()) == all_keys - enrich_1_keys - enrich_2_keys
[docs]def test_fail_no_store_factory(driver, function_store, skip_eager): df_seed = pd.DataFrame( {"x": [0, 1, 2, 3], "p": [0, 0, 1, 1], "v1": [10, 11, 12, 13]} ) df_enrich = pd.DataFrame( {"x": [0, 1, 2, 3], "p": [0, 0, 1, 1], "v2": [10, 11, 12, 13]} ) cube = Cube(dimension_columns=["x"], partition_columns=["p"], uuid_prefix="cube") build_cube( data={cube.seed_dataset: df_seed, "enrich": df_enrich}, cube=cube, store=function_store, ) store = function_store() with pytest.raises(TypeError) as exc: driver(cube=cube, store=store, no_run=True) assert str(exc.value) == "store must be a factory but is HFilesystemStore"
[docs]def test_fail_blocksize_wrong_type(driver, function_store, skip_eager): cube = Cube(dimension_columns=["x"], partition_columns=["p"], uuid_prefix="cube") with pytest.raises(TypeError, match="blocksize must be an integer but is str"): driver(cube=cube, store=function_store, blocksize="foo")
[docs]def test_fail_blocksize_negative(driver, function_store, skip_eager): cube = Cube(dimension_columns=["x"], partition_columns=["p"], uuid_prefix="cube") with pytest.raises(ValueError, match="blocksize must be > 0 but is -1"): driver(cube=cube, store=function_store, blocksize=-1)
[docs]def test_fail_blocksize_zero(driver, function_store, skip_eager): cube = Cube(dimension_columns=["x"], partition_columns=["p"], uuid_prefix="cube") with pytest.raises(ValueError, match="blocksize must be > 0 but is 0"): driver(cube=cube, store=function_store, blocksize=0)