Source code for kartothek.io.testing.merge

from collections import OrderedDict
from datetime import date

import pandas as pd
import pandas.testing as pdt

from kartothek.io_components.metapartition import SINGLE_TABLE

MERGE_TASKS = [
    {
        "left": SINGLE_TABLE,
        "right": "helper",
        "merge_kwargs": {"how": "left", "sort": False, "copy": False},
        "output_label": "first_output",
    },
    {
        "left": "first_output",
        "right": "PRED",
        "merge_kwargs": {"how": "left", "sort": False, "copy": False},
        "output_label": "final",
    },
]

MERGE_EXP_CL1 = pd.DataFrame(
    OrderedDict(
        [
            ("P", [1, 1]),
            ("L", [1, 1]),
            ("TARGET", [1, 1]),
            ("HORIZON", [1, 2]),
            ("info", ["a", "a"]),
            ("PRED", [10, 20]),
            ("DATE", pd.to_datetime([date(2010, 1, 1), date(2010, 1, 1)])),
        ]
    )
)

MERGE_EXP_CL2 = pd.DataFrame(
    OrderedDict(
        [
            ("P", [2, 2]),
            ("L", [2, 2]),
            ("TARGET", [2, 2]),
            ("HORIZON", [1, 2]),
            ("info", ["b", "b"]),
            ("PRED", [10, 20]),
            ("DATE", pd.to_datetime([date(2009, 12, 31), date(2009, 12, 31)])),
        ]
    )
)


[docs]def test_merge_datasets( dataset, evaluation_dataset, store_factory, store_session_factory, frozen_time, bound_merge_datasets, ): # In the __pipeline case, we also need to check that the write path is # correct, the tests for it are much larger. df_list = bound_merge_datasets( left_dataset_uuid=dataset.uuid, right_dataset_uuid=evaluation_dataset.uuid, store=store_session_factory, merge_tasks=MERGE_TASKS, match_how="prefix", ) df_list = [mp.data for mp in df_list] # Two partitions assert len(df_list) == 2 assert len(df_list[1]) == 1 assert len(df_list[0]) == 1 # By using values() this test is agnostic to the used key, which is # currently not of any importance pdt.assert_frame_equal( list(df_list[0].values())[0], MERGE_EXP_CL1, check_like=True, check_dtype=False, check_categorical=False, ) pdt.assert_frame_equal( list(df_list[1].values())[0], MERGE_EXP_CL2, check_like=True, check_dtype=False, check_categorical=False, )