src.fairreckitlib.data.pipeline.data_run
This module contains functionality to run the data pipeline.
Classes:
DataPipelineConfig: configuration class to run the data pipelines.
Functions:
run_data_pipelines: run the pipeline using dataset configurations.
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)
1"""This module contains functionality to run the data pipeline. 2 3Classes: 4 5 DataPipelineConfig: configuration class to run the data pipelines. 6 7Functions: 8 9 run_data_pipelines: run the pipeline using dataset configurations. 10 11This program has been developed by students from the bachelor Computer Science at 12Utrecht University within the Software Project course. 13© Copyright Utrecht University (Department of Information and Computing Sciences) 14""" 15 16from dataclasses import dataclass 17from typing import Callable, List 18 19from ...core.config.config_factories import GroupFactory 20from ...core.events.event_dispatcher import EventDispatcher 21from ...core.events.event_error import ON_FAILURE_ERROR, ErrorEventArgs 22from ..set.dataset_registry import DataRegistry 23from .data_config import DataMatrixConfig 24from .data_pipeline import DataPipeline, DataTransition 25 26 27@dataclass 28class DataPipelineConfig: 29 """Data Pipeline Configuration. 30 31 output_dir: the directory to store the output. 32 data_registry: the registry with available datasets. 33 data_factory: the factory with available data modifier factories. 34 data_config_list: the dataset matrix configurations to compute. 35 """ 36 37 output_dir: str 38 data_registry: DataRegistry 39 data_factory: GroupFactory 40 data_config_list: List[DataMatrixConfig] 41 42 43def run_data_pipelines( 44 pipeline_config: DataPipelineConfig, 45 event_dispatcher: EventDispatcher, 46 is_running: Callable[[], bool]) -> List[DataTransition]: 47 """Run a Data Pipeline several times according to the specified data pipeline configuration. 48 49 Args: 50 pipeline_config: the configuration on how to run the data pipelines. 51 event_dispatcher: used to dispatch data/IO events when running the pipeline. 52 is_running: function that returns whether the pipelines 53 are still running. Stops early when False is returned. 54 55 Returns: 56 a list of DataTransition's. 57 """ 58 data_result = [] 59 60 data_pipeline = DataPipeline(pipeline_config.data_factory, event_dispatcher) 61 for data_config in pipeline_config.data_config_list: 62 dataset = pipeline_config.data_registry.get_set(data_config.dataset) 63 if dataset is None: 64 event_dispatcher.dispatch(ErrorEventArgs( 65 ON_FAILURE_ERROR, 66 'Failure: to get dataset ' + data_config.dataset + ' from registry' 67 )) 68 continue 69 70 if data_config.matrix not in dataset.get_available_matrices(): 71 event_dispatcher.dispatch(ErrorEventArgs( 72 ON_FAILURE_ERROR, 73 'Failure: to get matrix ' + data_config.matrix + ' from ' + data_config.dataset 74 )) 75 continue 76 77 try: 78 data_transition = data_pipeline.run( 79 pipeline_config.output_dir, 80 dataset, 81 data_config, 82 is_running 83 ) 84 except (FileNotFoundError, RuntimeError): 85 continue 86 87 data_result.append(data_transition) 88 if not is_running(): 89 return data_result 90 91 return data_result
28@dataclass 29class DataPipelineConfig: 30 """Data Pipeline Configuration. 31 32 output_dir: the directory to store the output. 33 data_registry: the registry with available datasets. 34 data_factory: the factory with available data modifier factories. 35 data_config_list: the dataset matrix configurations to compute. 36 """ 37 38 output_dir: str 39 data_registry: DataRegistry 40 data_factory: GroupFactory 41 data_config_list: List[DataMatrixConfig]
Data Pipeline Configuration.
output_dir: the directory to store the output. data_registry: the registry with available datasets. data_factory: the factory with available data modifier factories. data_config_list: the dataset matrix configurations to compute.
44def run_data_pipelines( 45 pipeline_config: DataPipelineConfig, 46 event_dispatcher: EventDispatcher, 47 is_running: Callable[[], bool]) -> List[DataTransition]: 48 """Run a Data Pipeline several times according to the specified data pipeline configuration. 49 50 Args: 51 pipeline_config: the configuration on how to run the data pipelines. 52 event_dispatcher: used to dispatch data/IO events when running the pipeline. 53 is_running: function that returns whether the pipelines 54 are still running. Stops early when False is returned. 55 56 Returns: 57 a list of DataTransition's. 58 """ 59 data_result = [] 60 61 data_pipeline = DataPipeline(pipeline_config.data_factory, event_dispatcher) 62 for data_config in pipeline_config.data_config_list: 63 dataset = pipeline_config.data_registry.get_set(data_config.dataset) 64 if dataset is None: 65 event_dispatcher.dispatch(ErrorEventArgs( 66 ON_FAILURE_ERROR, 67 'Failure: to get dataset ' + data_config.dataset + ' from registry' 68 )) 69 continue 70 71 if data_config.matrix not in dataset.get_available_matrices(): 72 event_dispatcher.dispatch(ErrorEventArgs( 73 ON_FAILURE_ERROR, 74 'Failure: to get matrix ' + data_config.matrix + ' from ' + data_config.dataset 75 )) 76 continue 77 78 try: 79 data_transition = data_pipeline.run( 80 pipeline_config.output_dir, 81 dataset, 82 data_config, 83 is_running 84 ) 85 except (FileNotFoundError, RuntimeError): 86 continue 87 88 data_result.append(data_transition) 89 if not is_running(): 90 return data_result 91 92 return data_result
Run a Data Pipeline several times according to the specified data pipeline configuration.
Args: pipeline_config: the configuration on how to run the data pipelines. event_dispatcher: used to dispatch data/IO events when running the pipeline. is_running: function that returns whether the pipelines are still running. Stops early when False is returned.
Returns: a list of DataTransition's.