src.fairreckitlib.experiment.experiment_run
This module contains functionality to run the experiment pipelines.
Classes:
ExperimentPipelineConfig: configuration class to run the experiment pipelines.
Functions:
run_experiment_pipelines: run the pipeline one or more runs.
resolve_experiment_start_run: resolve the start run of an existing result directory.
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)
1"""This module contains functionality to run the experiment pipelines. 2 3Classes: 4 5 ExperimentPipelineConfig: configuration class to run the experiment pipelines. 6 7Functions: 8 9 run_experiment_pipelines: run the pipeline one or more runs. 10 resolve_experiment_start_run: resolve the start run of an existing result directory. 11 12This program has been developed by students from the bachelor Computer Science at 13Utrecht University within the Software Project course. 14© Copyright Utrecht University (Department of Information and Computing Sciences) 15""" 16 17from dataclasses import dataclass 18import os 19from typing import Callable, Union 20 21from ..core.config.config_factories import GroupFactory 22from ..core.events.event_dispatcher import EventDispatcher 23from ..core.io.io_create import create_dir, create_yml 24from ..data.set.dataset_registry import DataRegistry 25from .experiment_config import PredictorExperimentConfig, RecommenderExperimentConfig 26from .experiment_pipeline import ExperimentPipeline 27 28 29@dataclass 30class ExperimentPipelineConfig: 31 """Experiment Pipeline Configuration. 32 33 output_dir: the directory to store the output. 34 data_registry: the registry with available datasets. 35 experiment_factory: the factory with data/model/evaluation pipeline factories. 36 experiment_config: the experiment configuration to compute. 37 start_run: the experiment run to start with. 38 num_runs: the number of runs of the experiment. 39 num_threads: the max number of threads the experiment can use. 40 """ 41 42 output_dir: str 43 data_registry: DataRegistry 44 experiment_factory: GroupFactory 45 experiment_config: Union[PredictorExperimentConfig, RecommenderExperimentConfig] 46 start_run: int 47 num_runs: int 48 num_threads: int 49 50 51def run_experiment_pipelines( 52 pipeline_config: ExperimentPipelineConfig, 53 event_dispatcher: EventDispatcher, 54 is_running: Callable[[], bool]) -> bool: 55 """Run the experiment pipeline several runs according to the specified pipeline configuration. 56 57 Args: 58 pipeline_config: the configuration on how to run the experiment pipelines. 59 event_dispatcher: used to dispatch model/IO events when running the experiment pipelines. 60 is_running: function that returns whether the pipelines 61 are still running. Stops early when False is returned. 62 63 Returns: 64 whether running the experiment pipelines succeeded. 65 """ 66 if not os.path.isdir(pipeline_config.output_dir): 67 # create result output directory 68 create_dir(pipeline_config.output_dir, event_dispatcher) 69 70 # save the yml configuration file 71 create_yml( 72 os.path.join(pipeline_config.output_dir, 'config.yml'), 73 pipeline_config.experiment_config.to_yml_format(), 74 event_dispatcher 75 ) 76 77 # prepare pipeline 78 experiment_pipeline = ExperimentPipeline( 79 pipeline_config.data_registry, 80 pipeline_config.experiment_factory, 81 event_dispatcher 82 ) 83 84 start_run = pipeline_config.start_run 85 end_run = start_run + pipeline_config.num_runs 86 87 # run the pipeline 88 for run in range(start_run, end_run): 89 try: 90 experiment_pipeline.run( 91 os.path.join(pipeline_config.output_dir, 'run_' + str(run)), 92 pipeline_config.experiment_config, 93 pipeline_config.num_threads, 94 is_running 95 ) 96 except RuntimeError: 97 return False 98 99 return True 100 101 102def resolve_experiment_start_run(result_dir: str) -> int: 103 """Resolve which run will be next in the specified result directory. 104 105 Args: 106 result_dir: path to the result directory to look into. 107 108 Raises: 109 IOError: when the specified result directory does not exist. 110 111 Returns: 112 the next run index for this result directory. 113 """ 114 if not os.path.isdir(result_dir): 115 raise IOError('Unknown result directory') 116 117 directories = [] 118 for dir_name in os.listdir(result_dir): 119 if not os.path.isdir(os.path.join(result_dir, dir_name)): 120 continue 121 122 if not dir_name.startswith('run_'): 123 continue 124 125 run_split = dir_name.split('_') 126 try: 127 directories.append(int(run_split[1])) 128 except ValueError: 129 continue 130 131 if len(directories) == 0: 132 return 0 133 134 return max(directories) + 1
30@dataclass 31class ExperimentPipelineConfig: 32 """Experiment Pipeline Configuration. 33 34 output_dir: the directory to store the output. 35 data_registry: the registry with available datasets. 36 experiment_factory: the factory with data/model/evaluation pipeline factories. 37 experiment_config: the experiment configuration to compute. 38 start_run: the experiment run to start with. 39 num_runs: the number of runs of the experiment. 40 num_threads: the max number of threads the experiment can use. 41 """ 42 43 output_dir: str 44 data_registry: DataRegistry 45 experiment_factory: GroupFactory 46 experiment_config: Union[PredictorExperimentConfig, RecommenderExperimentConfig] 47 start_run: int 48 num_runs: int 49 num_threads: int
Experiment Pipeline Configuration.
output_dir: the directory to store the output. data_registry: the registry with available datasets. experiment_factory: the factory with data/model/evaluation pipeline factories. experiment_config: the experiment configuration to compute. start_run: the experiment run to start with. num_runs: the number of runs of the experiment. num_threads: the max number of threads the experiment can use.
52def run_experiment_pipelines( 53 pipeline_config: ExperimentPipelineConfig, 54 event_dispatcher: EventDispatcher, 55 is_running: Callable[[], bool]) -> bool: 56 """Run the experiment pipeline several runs according to the specified pipeline configuration. 57 58 Args: 59 pipeline_config: the configuration on how to run the experiment pipelines. 60 event_dispatcher: used to dispatch model/IO events when running the experiment pipelines. 61 is_running: function that returns whether the pipelines 62 are still running. Stops early when False is returned. 63 64 Returns: 65 whether running the experiment pipelines succeeded. 66 """ 67 if not os.path.isdir(pipeline_config.output_dir): 68 # create result output directory 69 create_dir(pipeline_config.output_dir, event_dispatcher) 70 71 # save the yml configuration file 72 create_yml( 73 os.path.join(pipeline_config.output_dir, 'config.yml'), 74 pipeline_config.experiment_config.to_yml_format(), 75 event_dispatcher 76 ) 77 78 # prepare pipeline 79 experiment_pipeline = ExperimentPipeline( 80 pipeline_config.data_registry, 81 pipeline_config.experiment_factory, 82 event_dispatcher 83 ) 84 85 start_run = pipeline_config.start_run 86 end_run = start_run + pipeline_config.num_runs 87 88 # run the pipeline 89 for run in range(start_run, end_run): 90 try: 91 experiment_pipeline.run( 92 os.path.join(pipeline_config.output_dir, 'run_' + str(run)), 93 pipeline_config.experiment_config, 94 pipeline_config.num_threads, 95 is_running 96 ) 97 except RuntimeError: 98 return False 99 100 return True
Run the experiment pipeline several runs according to the specified pipeline configuration.
Args: pipeline_config: the configuration on how to run the experiment pipelines. event_dispatcher: used to dispatch model/IO events when running the experiment pipelines. is_running: function that returns whether the pipelines are still running. Stops early when False is returned.
Returns: whether running the experiment pipelines succeeded.
103def resolve_experiment_start_run(result_dir: str) -> int: 104 """Resolve which run will be next in the specified result directory. 105 106 Args: 107 result_dir: path to the result directory to look into. 108 109 Raises: 110 IOError: when the specified result directory does not exist. 111 112 Returns: 113 the next run index for this result directory. 114 """ 115 if not os.path.isdir(result_dir): 116 raise IOError('Unknown result directory') 117 118 directories = [] 119 for dir_name in os.listdir(result_dir): 120 if not os.path.isdir(os.path.join(result_dir, dir_name)): 121 continue 122 123 if not dir_name.startswith('run_'): 124 continue 125 126 run_split = dir_name.split('_') 127 try: 128 directories.append(int(run_split[1])) 129 except ValueError: 130 continue 131 132 if len(directories) == 0: 133 return 0 134 135 return max(directories) + 1
Resolve which run will be next in the specified result directory.
Args: result_dir: path to the result directory to look into.
Raises: IOError: when the specified result directory does not exist.
Returns: the next run index for this result directory.