src.fairreckitlib.experiment.experiment_run

This module contains functionality to run the experiment pipelines.

Classes:

ExperimentPipelineConfig: configuration class to run the experiment pipelines.

Functions:

run_experiment_pipelines: run the pipeline one or more runs.
resolve_experiment_start_run: resolve the start run of an existing result directory.

This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)

  1"""This module contains functionality to run the experiment pipelines.
  2
  3Classes:
  4
  5    ExperimentPipelineConfig: configuration class to run the experiment pipelines.
  6
  7Functions:
  8
  9    run_experiment_pipelines: run the pipeline one or more runs.
 10    resolve_experiment_start_run: resolve the start run of an existing result directory.
 11
 12This program has been developed by students from the bachelor Computer Science at
 13Utrecht University within the Software Project course.
 14© Copyright Utrecht University (Department of Information and Computing Sciences)
 15"""
 16
 17from dataclasses import dataclass
 18import os
 19from typing import Callable, Union
 20
 21from ..core.config.config_factories import GroupFactory
 22from ..core.events.event_dispatcher import EventDispatcher
 23from ..core.io.io_create import create_dir, create_yml
 24from ..data.set.dataset_registry import DataRegistry
 25from .experiment_config import PredictorExperimentConfig, RecommenderExperimentConfig
 26from .experiment_pipeline import ExperimentPipeline
 27
 28
 29@dataclass
 30class ExperimentPipelineConfig:
 31    """Experiment Pipeline Configuration.
 32
 33    output_dir: the directory to store the output.
 34    data_registry: the registry with available datasets.
 35    experiment_factory: the factory with data/model/evaluation pipeline factories.
 36    experiment_config: the experiment configuration to compute.
 37    start_run: the experiment run to start with.
 38    num_runs: the number of runs of the experiment.
 39    num_threads: the max number of threads the experiment can use.
 40    """
 41
 42    output_dir: str
 43    data_registry: DataRegistry
 44    experiment_factory: GroupFactory
 45    experiment_config: Union[PredictorExperimentConfig, RecommenderExperimentConfig]
 46    start_run: int
 47    num_runs: int
 48    num_threads: int
 49
 50
 51def run_experiment_pipelines(
 52        pipeline_config: ExperimentPipelineConfig,
 53        event_dispatcher: EventDispatcher,
 54        is_running: Callable[[], bool]) -> bool:
 55    """Run the experiment pipeline several runs according to the specified pipeline configuration.
 56
 57    Args:
 58        pipeline_config: the configuration on how to run the experiment pipelines.
 59        event_dispatcher: used to dispatch model/IO events when running the experiment pipelines.
 60        is_running: function that returns whether the pipelines
 61            are still running. Stops early when False is returned.
 62
 63    Returns:
 64        whether running the experiment pipelines succeeded.
 65    """
 66    if not os.path.isdir(pipeline_config.output_dir):
 67        # create result output directory
 68        create_dir(pipeline_config.output_dir, event_dispatcher)
 69
 70        # save the yml configuration file
 71        create_yml(
 72            os.path.join(pipeline_config.output_dir, 'config.yml'),
 73            pipeline_config.experiment_config.to_yml_format(),
 74            event_dispatcher
 75        )
 76
 77    # prepare pipeline
 78    experiment_pipeline = ExperimentPipeline(
 79        pipeline_config.data_registry,
 80        pipeline_config.experiment_factory,
 81        event_dispatcher
 82    )
 83
 84    start_run = pipeline_config.start_run
 85    end_run = start_run + pipeline_config.num_runs
 86
 87    # run the pipeline
 88    for run in range(start_run, end_run):
 89        try:
 90            experiment_pipeline.run(
 91                os.path.join(pipeline_config.output_dir, 'run_' + str(run)),
 92                pipeline_config.experiment_config,
 93                pipeline_config.num_threads,
 94                is_running
 95            )
 96        except RuntimeError:
 97            return False
 98
 99    return True
100
101
102def resolve_experiment_start_run(result_dir: str) -> int:
103    """Resolve which run will be next in the specified result directory.
104
105    Args:
106        result_dir: path to the result directory to look into.
107
108    Raises:
109        IOError: when the specified result directory does not exist.
110
111    Returns:
112        the next run index for this result directory.
113    """
114    if not os.path.isdir(result_dir):
115        raise IOError('Unknown result directory')
116
117    directories = []
118    for dir_name in os.listdir(result_dir):
119        if not os.path.isdir(os.path.join(result_dir, dir_name)):
120            continue
121
122        if not dir_name.startswith('run_'):
123            continue
124
125        run_split = dir_name.split('_')
126        try:
127            directories.append(int(run_split[1]))
128        except ValueError:
129            continue
130
131    if len(directories) == 0:
132        return 0
133
134    return max(directories) + 1
@dataclass
class ExperimentPipelineConfig:
30@dataclass
31class ExperimentPipelineConfig:
32    """Experiment Pipeline Configuration.
33
34    output_dir: the directory to store the output.
35    data_registry: the registry with available datasets.
36    experiment_factory: the factory with data/model/evaluation pipeline factories.
37    experiment_config: the experiment configuration to compute.
38    start_run: the experiment run to start with.
39    num_runs: the number of runs of the experiment.
40    num_threads: the max number of threads the experiment can use.
41    """
42
43    output_dir: str
44    data_registry: DataRegistry
45    experiment_factory: GroupFactory
46    experiment_config: Union[PredictorExperimentConfig, RecommenderExperimentConfig]
47    start_run: int
48    num_runs: int
49    num_threads: int

Experiment Pipeline Configuration.

output_dir: the directory to store the output. data_registry: the registry with available datasets. experiment_factory: the factory with data/model/evaluation pipeline factories. experiment_config: the experiment configuration to compute. start_run: the experiment run to start with. num_runs: the number of runs of the experiment. num_threads: the max number of threads the experiment can use.

ExperimentPipelineConfig( output_dir: str, data_registry: src.fairreckitlib.data.set.dataset_registry.DataRegistry, experiment_factory: src.fairreckitlib.core.config.config_factories.GroupFactory, experiment_config: Union[src.fairreckitlib.experiment.experiment_config.PredictorExperimentConfig, src.fairreckitlib.experiment.experiment_config.RecommenderExperimentConfig], start_run: int, num_runs: int, num_threads: int)
def run_experiment_pipelines( pipeline_config: src.fairreckitlib.experiment.experiment_run.ExperimentPipelineConfig, event_dispatcher: src.fairreckitlib.core.events.event_dispatcher.EventDispatcher, is_running: Callable[[], bool]) -> bool:
 52def run_experiment_pipelines(
 53        pipeline_config: ExperimentPipelineConfig,
 54        event_dispatcher: EventDispatcher,
 55        is_running: Callable[[], bool]) -> bool:
 56    """Run the experiment pipeline several runs according to the specified pipeline configuration.
 57
 58    Args:
 59        pipeline_config: the configuration on how to run the experiment pipelines.
 60        event_dispatcher: used to dispatch model/IO events when running the experiment pipelines.
 61        is_running: function that returns whether the pipelines
 62            are still running. Stops early when False is returned.
 63
 64    Returns:
 65        whether running the experiment pipelines succeeded.
 66    """
 67    if not os.path.isdir(pipeline_config.output_dir):
 68        # create result output directory
 69        create_dir(pipeline_config.output_dir, event_dispatcher)
 70
 71        # save the yml configuration file
 72        create_yml(
 73            os.path.join(pipeline_config.output_dir, 'config.yml'),
 74            pipeline_config.experiment_config.to_yml_format(),
 75            event_dispatcher
 76        )
 77
 78    # prepare pipeline
 79    experiment_pipeline = ExperimentPipeline(
 80        pipeline_config.data_registry,
 81        pipeline_config.experiment_factory,
 82        event_dispatcher
 83    )
 84
 85    start_run = pipeline_config.start_run
 86    end_run = start_run + pipeline_config.num_runs
 87
 88    # run the pipeline
 89    for run in range(start_run, end_run):
 90        try:
 91            experiment_pipeline.run(
 92                os.path.join(pipeline_config.output_dir, 'run_' + str(run)),
 93                pipeline_config.experiment_config,
 94                pipeline_config.num_threads,
 95                is_running
 96            )
 97        except RuntimeError:
 98            return False
 99
100    return True

Run the experiment pipeline several runs according to the specified pipeline configuration.

Args: pipeline_config: the configuration on how to run the experiment pipelines. event_dispatcher: used to dispatch model/IO events when running the experiment pipelines. is_running: function that returns whether the pipelines are still running. Stops early when False is returned.

Returns: whether running the experiment pipelines succeeded.

def resolve_experiment_start_run(result_dir: str) -> int:
103def resolve_experiment_start_run(result_dir: str) -> int:
104    """Resolve which run will be next in the specified result directory.
105
106    Args:
107        result_dir: path to the result directory to look into.
108
109    Raises:
110        IOError: when the specified result directory does not exist.
111
112    Returns:
113        the next run index for this result directory.
114    """
115    if not os.path.isdir(result_dir):
116        raise IOError('Unknown result directory')
117
118    directories = []
119    for dir_name in os.listdir(result_dir):
120        if not os.path.isdir(os.path.join(result_dir, dir_name)):
121            continue
122
123        if not dir_name.startswith('run_'):
124            continue
125
126        run_split = dir_name.split('_')
127        try:
128            directories.append(int(run_split[1]))
129        except ValueError:
130            continue
131
132    if len(directories) == 0:
133        return 0
134
135    return max(directories) + 1

Resolve which run will be next in the specified result directory.

Args: result_dir: path to the result directory to look into.

Raises: IOError: when the specified result directory does not exist.

Returns: the next run index for this result directory.