Source code for kartothek.io.testing.utils

import math
import string

import numpy as np
import pandas as pd
from pyarrow.parquet import ParquetFile

from kartothek.io.eager import store_dataframes_as_dataset
from kartothek.io_components.metapartition import SINGLE_TABLE


[docs]def create_dataset(dataset_uuid, store_factory, metadata_version): df = pd.DataFrame( {"P": np.arange(0, 10), "L": np.arange(0, 10), "TARGET": np.arange(10, 20)} ) df_helper = pd.DataFrame( {"P": np.arange(0, 10), "info": string.ascii_lowercase[:10]} ) df_list = [ { "label": "cluster_1", "data": [(SINGLE_TABLE, df.copy(deep=True)), ("helper", df_helper)], "indices": {"P": {val: ["cluster_2"] for val in df.TARGET.unique()}}, }, { "label": "cluster_2", "data": [(SINGLE_TABLE, df.copy(deep=True)), ("helper", df_helper)], "indices": {"P": {val: ["cluster_2"] for val in df.TARGET.unique()}}, }, ] return store_dataframes_as_dataset( dfs=df_list, store=store_factory, dataset_uuid=dataset_uuid, metadata_version=metadata_version, )
[docs]def assert_num_row_groups(store, dataset, part_num_rows, part_chunk_size): """ Assert that the row groups of each partition match the expectation based on the number of rows and the chunk size """ # Iterate over the partitions of each index value for index, partitions in dataset.indices["p"].index_dct.items(): for part_key in partitions: key = dataset.partitions[part_key].files["table"] parquet_file = ParquetFile(store.open(key)) if part_chunk_size[index] is None: assert parquet_file.num_row_groups == 1 else: assert parquet_file.num_row_groups == math.ceil( part_num_rows[index] / part_chunk_size[index] )