Source code for kartothek.io_components.gc

# -*- coding: utf-8 -*-


from kartothek.core.factory import _ensure_factory
from kartothek.core.naming import TABLE_METADATA_FILE


[docs]def dispatch_files_to_gc(dataset_uuid, store_factory, chunk_size, factory): ds_factory = _ensure_factory( dataset_uuid=dataset_uuid, store=store_factory, factory=factory, load_dataset_metadata=False, ) dataset_uuid = dataset_uuid or ds_factory.uuid index_path = "{dataset_uuid}/indices/".format(dataset_uuid=dataset_uuid) remove_index_files = set(ds_factory.store.iter_keys(prefix=index_path)) for index in ds_factory.indices.values(): index_keys = set() # We only add the indices that are saved as explicit indices if index.index_storage_key: index_keys.add(index.index_storage_key) remove_index_files -= index_keys remove_table_files = set() if ds_factory.explicit_partitions: table_files = set() for partition in ds_factory.partitions.values(): for name in partition.files.values(): table_files.add(name) for table in ds_factory.tables: table_path = "{dataset_uuid}/{table}/".format( dataset_uuid=dataset_uuid, table=table ) table_files.add(table_path + TABLE_METADATA_FILE) for key in ds_factory.store.iter_keys(prefix=table_path): remove_table_files.add(key) remove_table_files -= table_files files_to_remove = list(remove_index_files | remove_table_files) if chunk_size is None: yield files_to_remove else: for i in range(0, len(files_to_remove), chunk_size): yield files_to_remove[i : i + chunk_size]
[docs]def delete_files(files, store_factory): store = store_factory() for f in files: store.delete(f)