src.fairreckitlib.data.pipeline.data_run

This module contains functionality to run the data pipeline.

Classes:

DataPipelineConfig: configuration class to run the data pipelines.

Functions:

run_data_pipelines: run the pipeline using dataset configurations.

This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)

 1"""This module contains functionality to run the data pipeline.
 2
 3Classes:
 4
 5    DataPipelineConfig: configuration class to run the data pipelines.
 6
 7Functions:
 8
 9    run_data_pipelines: run the pipeline using dataset configurations.
10
11This program has been developed by students from the bachelor Computer Science at
12Utrecht University within the Software Project course.
13© Copyright Utrecht University (Department of Information and Computing Sciences)
14"""
15
16from dataclasses import dataclass
17from typing import Callable, List
18
19from ...core.config.config_factories import GroupFactory
20from ...core.events.event_dispatcher import EventDispatcher
21from ...core.events.event_error import ON_FAILURE_ERROR, ErrorEventArgs
22from ..set.dataset_registry import DataRegistry
23from .data_config import DataMatrixConfig
24from .data_pipeline import DataPipeline, DataTransition
25
26
27@dataclass
28class DataPipelineConfig:
29    """Data Pipeline Configuration.
30
31    output_dir: the directory to store the output.
32    data_registry: the registry with available datasets.
33    data_factory: the factory with available data modifier factories.
34    data_config_list: the dataset matrix configurations to compute.
35    """
36
37    output_dir: str
38    data_registry: DataRegistry
39    data_factory: GroupFactory
40    data_config_list: List[DataMatrixConfig]
41
42
43def run_data_pipelines(
44        pipeline_config: DataPipelineConfig,
45        event_dispatcher: EventDispatcher,
46        is_running: Callable[[], bool]) -> List[DataTransition]:
47    """Run a Data Pipeline several times according to the specified data pipeline configuration.
48
49    Args:
50        pipeline_config: the configuration on how to run the data pipelines.
51        event_dispatcher: used to dispatch data/IO events when running the pipeline.
52        is_running: function that returns whether the pipelines
53            are still running. Stops early when False is returned.
54
55    Returns:
56        a list of DataTransition's.
57    """
58    data_result = []
59
60    data_pipeline = DataPipeline(pipeline_config.data_factory, event_dispatcher)
61    for data_config in pipeline_config.data_config_list:
62        dataset = pipeline_config.data_registry.get_set(data_config.dataset)
63        if dataset is None:
64            event_dispatcher.dispatch(ErrorEventArgs(
65                ON_FAILURE_ERROR,
66                'Failure: to get dataset ' + data_config.dataset + ' from registry'
67            ))
68            continue
69
70        if data_config.matrix not in dataset.get_available_matrices():
71            event_dispatcher.dispatch(ErrorEventArgs(
72                ON_FAILURE_ERROR,
73                'Failure: to get matrix ' + data_config.matrix + ' from ' + data_config.dataset
74            ))
75            continue
76
77        try:
78            data_transition = data_pipeline.run(
79                pipeline_config.output_dir,
80                dataset,
81                data_config,
82                is_running
83            )
84        except (FileNotFoundError, RuntimeError):
85            continue
86
87        data_result.append(data_transition)
88        if not is_running():
89            return data_result
90
91    return data_result
@dataclass
class DataPipelineConfig:
28@dataclass
29class DataPipelineConfig:
30    """Data Pipeline Configuration.
31
32    output_dir: the directory to store the output.
33    data_registry: the registry with available datasets.
34    data_factory: the factory with available data modifier factories.
35    data_config_list: the dataset matrix configurations to compute.
36    """
37
38    output_dir: str
39    data_registry: DataRegistry
40    data_factory: GroupFactory
41    data_config_list: List[DataMatrixConfig]

Data Pipeline Configuration.

output_dir: the directory to store the output. data_registry: the registry with available datasets. data_factory: the factory with available data modifier factories. data_config_list: the dataset matrix configurations to compute.

DataPipelineConfig( output_dir: str, data_registry: src.fairreckitlib.data.set.dataset_registry.DataRegistry, data_factory: src.fairreckitlib.core.config.config_factories.GroupFactory, data_config_list: List[src.fairreckitlib.data.pipeline.data_config.DataMatrixConfig])
def run_data_pipelines( pipeline_config: src.fairreckitlib.data.pipeline.data_run.DataPipelineConfig, event_dispatcher: src.fairreckitlib.core.events.event_dispatcher.EventDispatcher, is_running: Callable[[], bool]) -> List[src.fairreckitlib.data.data_transition.DataTransition]:
44def run_data_pipelines(
45        pipeline_config: DataPipelineConfig,
46        event_dispatcher: EventDispatcher,
47        is_running: Callable[[], bool]) -> List[DataTransition]:
48    """Run a Data Pipeline several times according to the specified data pipeline configuration.
49
50    Args:
51        pipeline_config: the configuration on how to run the data pipelines.
52        event_dispatcher: used to dispatch data/IO events when running the pipeline.
53        is_running: function that returns whether the pipelines
54            are still running. Stops early when False is returned.
55
56    Returns:
57        a list of DataTransition's.
58    """
59    data_result = []
60
61    data_pipeline = DataPipeline(pipeline_config.data_factory, event_dispatcher)
62    for data_config in pipeline_config.data_config_list:
63        dataset = pipeline_config.data_registry.get_set(data_config.dataset)
64        if dataset is None:
65            event_dispatcher.dispatch(ErrorEventArgs(
66                ON_FAILURE_ERROR,
67                'Failure: to get dataset ' + data_config.dataset + ' from registry'
68            ))
69            continue
70
71        if data_config.matrix not in dataset.get_available_matrices():
72            event_dispatcher.dispatch(ErrorEventArgs(
73                ON_FAILURE_ERROR,
74                'Failure: to get matrix ' + data_config.matrix + ' from ' + data_config.dataset
75            ))
76            continue
77
78        try:
79            data_transition = data_pipeline.run(
80                pipeline_config.output_dir,
81                dataset,
82                data_config,
83                is_running
84            )
85        except (FileNotFoundError, RuntimeError):
86            continue
87
88        data_result.append(data_transition)
89        if not is_running():
90            return data_result
91
92    return data_result

Run a Data Pipeline several times according to the specified data pipeline configuration.

Args: pipeline_config: the configuration on how to run the data pipelines. event_dispatcher: used to dispatch data/IO events when running the pipeline. is_running: function that returns whether the pipelines are still running. Stops early when False is returned.

Returns: a list of DataTransition's.