src.fairreckitlib.data.filter.filter_passes
This module contains a function that performs filtering from filter passes.
Functions: filter_from_filter_passes: Apply filter to filter passes.
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)
1"""This module contains a function that performs filtering from filter passes. 2 3Functions: 4 filter_from_filter_passes: Apply filter to filter passes. 5 6This program has been developed by students from the bachelor Computer Science at 7Utrecht University within the Software Project course. 8© Copyright Utrecht University (Department of Information and Computing Sciences) 9""" 10 11import os 12import random 13import pandas as pd 14 15from ...core.io.io_create import create_dir 16from ...core.io.io_delete import delete_dir 17 18from ...core.config.config_factories import GroupFactory 19from ...core.pipeline.core_pipeline import CorePipeline 20 21from .filter_config import DataSubsetConfig 22 23 24def filter_from_filter_passes(core_pipeline: CorePipeline, 25 output_dir: str, 26 dataframe: pd.DataFrame, 27 subset: DataSubsetConfig, 28 filter_factory: GroupFactory) -> pd.DataFrame: 29 """Apply filter to filter passes inside DataSubsetConfig. 30 31 For each filter pass, a filtered dataframe is returned. After which the dataframes 32 are appended to each other and returned. 33 34 Args: 35 core_pipeline: Pipeline where this function is used. Required for IO actions. 36 output_dir: Directory to write temp dataframes to. 37 dataframe: Dataframe to be filtered. 38 subset: Configuration file containing filter passes. 39 filter_factory: Factory containing filters. 40 41 Returns: 42 An aggregation of filtered dataframes. 43 """ 44 # Create temp files and store base dataframe. 45 random_num_str = str(random.randint(0, 100000)) # To prevent concurrency issues. 46 dir_path = create_dir(os.path.join(output_dir, 'filter_passes_temp'), 47 core_pipeline.event_dispatcher) 48 og_df_path = os.path.join(dir_path, 'og_df' + random_num_str + '.tsv') 49 core_pipeline.write_dataframe(og_df_path, dataframe, True) 50 51 # Apply filter passes. 52 final_df = None 53 filter_dataset_factory = filter_factory.get_factory(subset.dataset).get_factory(subset.matrix) 54 for filter_pass_config in subset.filter_passes: 55 dataframe = core_pipeline.read_dataframe( 56 og_df_path, 57 'original_dataframe', 58 'filter_passes.on_begin_load_original_dataframe', 59 'filter_passes.on_end_load_original_dataframe') 60 for _filter in filter_pass_config.filters: 61 filterobj = filter_dataset_factory.create(_filter.name, _filter.params) 62 dataframe = filterobj.run(dataframe) 63 if len(dataframe) == 0: 64 raise RuntimeError( 65 'Filter pass generating empty dataset. Perhaps filters chosen too strictly.' 66 ) 67 # Add to final dataframe and remove duplicates as well. 68 final_df = pd.concat( 69 [final_df, dataframe], copy=False).drop_duplicates().reset_index(drop=True) 70 71 delete_dir(dir_path, core_pipeline.event_dispatcher) 72 if len(final_df) == 0: 73 raise RuntimeError( 74 'Wholly filtered dataframe is empty. All filter passes too strict or initial \ 75 dataframe missing.' 76 ) 77 return final_df
25def filter_from_filter_passes(core_pipeline: CorePipeline, 26 output_dir: str, 27 dataframe: pd.DataFrame, 28 subset: DataSubsetConfig, 29 filter_factory: GroupFactory) -> pd.DataFrame: 30 """Apply filter to filter passes inside DataSubsetConfig. 31 32 For each filter pass, a filtered dataframe is returned. After which the dataframes 33 are appended to each other and returned. 34 35 Args: 36 core_pipeline: Pipeline where this function is used. Required for IO actions. 37 output_dir: Directory to write temp dataframes to. 38 dataframe: Dataframe to be filtered. 39 subset: Configuration file containing filter passes. 40 filter_factory: Factory containing filters. 41 42 Returns: 43 An aggregation of filtered dataframes. 44 """ 45 # Create temp files and store base dataframe. 46 random_num_str = str(random.randint(0, 100000)) # To prevent concurrency issues. 47 dir_path = create_dir(os.path.join(output_dir, 'filter_passes_temp'), 48 core_pipeline.event_dispatcher) 49 og_df_path = os.path.join(dir_path, 'og_df' + random_num_str + '.tsv') 50 core_pipeline.write_dataframe(og_df_path, dataframe, True) 51 52 # Apply filter passes. 53 final_df = None 54 filter_dataset_factory = filter_factory.get_factory(subset.dataset).get_factory(subset.matrix) 55 for filter_pass_config in subset.filter_passes: 56 dataframe = core_pipeline.read_dataframe( 57 og_df_path, 58 'original_dataframe', 59 'filter_passes.on_begin_load_original_dataframe', 60 'filter_passes.on_end_load_original_dataframe') 61 for _filter in filter_pass_config.filters: 62 filterobj = filter_dataset_factory.create(_filter.name, _filter.params) 63 dataframe = filterobj.run(dataframe) 64 if len(dataframe) == 0: 65 raise RuntimeError( 66 'Filter pass generating empty dataset. Perhaps filters chosen too strictly.' 67 ) 68 # Add to final dataframe and remove duplicates as well. 69 final_df = pd.concat( 70 [final_df, dataframe], copy=False).drop_duplicates().reset_index(drop=True) 71 72 delete_dir(dir_path, core_pipeline.event_dispatcher) 73 if len(final_df) == 0: 74 raise RuntimeError( 75 'Wholly filtered dataframe is empty. All filter passes too strict or initial \ 76 dataframe missing.' 77 ) 78 return final_df
Apply filter to filter passes inside DataSubsetConfig.
For each filter pass, a filtered dataframe is returned. After which the dataframes are appended to each other and returned.
Args: core_pipeline: Pipeline where this function is used. Required for IO actions. output_dir: Directory to write temp dataframes to. dataframe: Dataframe to be filtered. subset: Configuration file containing filter passes. filter_factory: Factory containing filters.
Returns: An aggregation of filtered dataframes.