src.fairreckitlib.data.filter.filter_passes

This module contains a function that performs filtering from filter passes.

Functions: filter_from_filter_passes: Apply filter to filter passes.

This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)

 1"""This module contains a function that performs filtering from filter passes.
 2
 3Functions:
 4    filter_from_filter_passes: Apply filter to filter passes.
 5
 6This program has been developed by students from the bachelor Computer Science at
 7Utrecht University within the Software Project course.
 8© Copyright Utrecht University (Department of Information and Computing Sciences)
 9"""
10
11import os
12import random
13import pandas as pd
14
15from ...core.io.io_create import create_dir
16from ...core.io.io_delete import delete_dir
17
18from ...core.config.config_factories import GroupFactory
19from ...core.pipeline.core_pipeline import CorePipeline
20
21from .filter_config import DataSubsetConfig
22
23
24def filter_from_filter_passes(core_pipeline: CorePipeline,
25                              output_dir: str,
26                              dataframe: pd.DataFrame,
27                              subset: DataSubsetConfig,
28                              filter_factory: GroupFactory) -> pd.DataFrame:
29    """Apply filter to filter passes inside DataSubsetConfig.
30
31    For each filter pass, a filtered dataframe is returned. After which the dataframes
32    are appended to each other and returned.
33
34    Args:
35        core_pipeline: Pipeline where this function is used. Required for IO actions.
36        output_dir: Directory to write temp dataframes to.
37        dataframe: Dataframe to be filtered.
38        subset: Configuration file containing filter passes.
39        filter_factory: Factory containing filters.
40
41    Returns:
42        An aggregation of filtered dataframes.
43    """
44    # Create temp files and store base dataframe.
45    random_num_str = str(random.randint(0, 100000))  # To prevent concurrency issues.
46    dir_path = create_dir(os.path.join(output_dir, 'filter_passes_temp'),
47        core_pipeline.event_dispatcher)
48    og_df_path = os.path.join(dir_path, 'og_df' + random_num_str + '.tsv')
49    core_pipeline.write_dataframe(og_df_path, dataframe, True)
50
51    # Apply filter passes.
52    final_df = None
53    filter_dataset_factory = filter_factory.get_factory(subset.dataset).get_factory(subset.matrix)
54    for filter_pass_config in subset.filter_passes:
55        dataframe = core_pipeline.read_dataframe(
56            og_df_path,
57            'original_dataframe',
58            'filter_passes.on_begin_load_original_dataframe',
59            'filter_passes.on_end_load_original_dataframe')
60        for _filter in filter_pass_config.filters:
61            filterobj = filter_dataset_factory.create(_filter.name, _filter.params)
62            dataframe = filterobj.run(dataframe)
63        if len(dataframe) == 0:
64            raise RuntimeError(
65                'Filter pass generating empty dataset. Perhaps filters chosen too strictly.'
66            )
67        # Add to final dataframe and remove duplicates as well.
68        final_df = pd.concat(
69            [final_df, dataframe], copy=False).drop_duplicates().reset_index(drop=True)
70
71    delete_dir(dir_path, core_pipeline.event_dispatcher)
72    if len(final_df) == 0:
73        raise RuntimeError(
74            'Wholly filtered dataframe is empty. All filter passes too strict or initial \
75            dataframe missing.'
76        )
77    return final_df
def filter_from_filter_passes( core_pipeline: src.fairreckitlib.core.pipeline.core_pipeline.CorePipeline, output_dir: str, dataframe: pandas.core.frame.DataFrame, subset: src.fairreckitlib.data.filter.filter_config.DataSubsetConfig, filter_factory: src.fairreckitlib.core.config.config_factories.GroupFactory) -> pandas.core.frame.DataFrame:
25def filter_from_filter_passes(core_pipeline: CorePipeline,
26                              output_dir: str,
27                              dataframe: pd.DataFrame,
28                              subset: DataSubsetConfig,
29                              filter_factory: GroupFactory) -> pd.DataFrame:
30    """Apply filter to filter passes inside DataSubsetConfig.
31
32    For each filter pass, a filtered dataframe is returned. After which the dataframes
33    are appended to each other and returned.
34
35    Args:
36        core_pipeline: Pipeline where this function is used. Required for IO actions.
37        output_dir: Directory to write temp dataframes to.
38        dataframe: Dataframe to be filtered.
39        subset: Configuration file containing filter passes.
40        filter_factory: Factory containing filters.
41
42    Returns:
43        An aggregation of filtered dataframes.
44    """
45    # Create temp files and store base dataframe.
46    random_num_str = str(random.randint(0, 100000))  # To prevent concurrency issues.
47    dir_path = create_dir(os.path.join(output_dir, 'filter_passes_temp'),
48        core_pipeline.event_dispatcher)
49    og_df_path = os.path.join(dir_path, 'og_df' + random_num_str + '.tsv')
50    core_pipeline.write_dataframe(og_df_path, dataframe, True)
51
52    # Apply filter passes.
53    final_df = None
54    filter_dataset_factory = filter_factory.get_factory(subset.dataset).get_factory(subset.matrix)
55    for filter_pass_config in subset.filter_passes:
56        dataframe = core_pipeline.read_dataframe(
57            og_df_path,
58            'original_dataframe',
59            'filter_passes.on_begin_load_original_dataframe',
60            'filter_passes.on_end_load_original_dataframe')
61        for _filter in filter_pass_config.filters:
62            filterobj = filter_dataset_factory.create(_filter.name, _filter.params)
63            dataframe = filterobj.run(dataframe)
64        if len(dataframe) == 0:
65            raise RuntimeError(
66                'Filter pass generating empty dataset. Perhaps filters chosen too strictly.'
67            )
68        # Add to final dataframe and remove duplicates as well.
69        final_df = pd.concat(
70            [final_df, dataframe], copy=False).drop_duplicates().reset_index(drop=True)
71
72    delete_dir(dir_path, core_pipeline.event_dispatcher)
73    if len(final_df) == 0:
74        raise RuntimeError(
75            'Wholly filtered dataframe is empty. All filter passes too strict or initial \
76            dataframe missing.'
77        )
78    return final_df

Apply filter to filter passes inside DataSubsetConfig.

For each filter pass, a filtered dataframe is returned. After which the dataframes are appended to each other and returned.

Args: core_pipeline: Pipeline where this function is used. Required for IO actions. output_dir: Directory to write temp dataframes to. dataframe: Dataframe to be filtered. subset: Configuration file containing filter passes. filter_factory: Factory containing filters.

Returns: An aggregation of filtered dataframes.