src.fairreckitlib.model.pipeline.model_run

This module contains functionality that wraps running the model pipeline multiple times.

Classes:

ModelPipelineConfig: configuration class to run the model pipelines.

Functions:

run_model_pipelines: run (multiple) pipelines for specified model configurations.

This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)

 1"""This module contains functionality that wraps running the model pipeline multiple times.
 2
 3Classes:
 4
 5    ModelPipelineConfig: configuration class to run the model pipelines.
 6
 7Functions:
 8
 9    run_model_pipelines: run (multiple) pipelines for specified model configurations.
10
11This program has been developed by students from the bachelor Computer Science at
12Utrecht University within the Software Project course.
13© Copyright Utrecht University (Department of Information and Computing Sciences)
14"""
15
16from dataclasses import dataclass
17from typing import Callable, Dict, List
18
19from ...core.config.config_factories import GroupFactory
20from ...core.events.event_dispatcher import EventDispatcher
21from ...core.events.event_error import ON_FAILURE_ERROR, ErrorEventArgs
22from ...data.data_transition import DataTransition
23from .model_config import ModelConfig
24
25
26@dataclass
27class ModelPipelineConfig:
28    """Model Pipeline Configuration.
29
30    output_dir: the directory to store the output.
31    data_transition: data input.
32    model_factory: the factory with available algorithm factories.
33    models: dictionary with api model configurations to compute.
34    """
35
36    output_dir: str
37    data_transition: DataTransition
38    model_factory: GroupFactory
39    models: Dict[str, List[ModelConfig]]
40
41
42def run_model_pipelines(
43        pipeline_config: ModelPipelineConfig,
44        event_dispatcher: EventDispatcher,
45        is_running: Callable[[], bool],
46        **kwargs) -> List[str]:
47    """Run several model pipelines according to the specified model pipeline configuration.
48
49    Args:
50        pipeline_config: the configuration on how to run the model pipelines.
51        event_dispatcher: used to dispatch model/IO events when running the model pipelines.
52        is_running: function that returns whether the pipelines
53            are still running. Stops early when False is returned.
54
55    Keyword Args:
56        num_threads(int): the max number of threads a model can use.
57        num_items(int): the number of item recommendations to produce, only
58            needed when running recommender pipelines.
59        rated_items_filter(bool): whether to filter already rated items when
60            producing item recommendations.
61
62    Returns:
63        list of directories where the computed model ratings are stored.
64    """
65    model_dirs = []
66
67    for api_name, models in pipeline_config.models.items():
68        api_factory = pipeline_config.model_factory.get_factory(api_name)
69        if api_factory is None:
70            event_dispatcher.dispatch(ErrorEventArgs(
71                ON_FAILURE_ERROR,
72                'Failure: to get algorithm API factory: ' + api_name
73            ))
74            continue
75
76        model_pipeline = api_factory.create_pipeline(
77            api_factory,
78            pipeline_config.data_transition,
79            event_dispatcher
80        )
81
82        try:
83            dirs = model_pipeline.run(
84                pipeline_config.output_dir,
85                models,
86                is_running,
87                **kwargs
88            )
89        except FileNotFoundError:
90            continue
91
92        model_dirs += dirs
93
94        if not is_running():
95            return model_dirs
96
97    return model_dirs
@dataclass
class ModelPipelineConfig:
27@dataclass
28class ModelPipelineConfig:
29    """Model Pipeline Configuration.
30
31    output_dir: the directory to store the output.
32    data_transition: data input.
33    model_factory: the factory with available algorithm factories.
34    models: dictionary with api model configurations to compute.
35    """
36
37    output_dir: str
38    data_transition: DataTransition
39    model_factory: GroupFactory
40    models: Dict[str, List[ModelConfig]]

Model Pipeline Configuration.

output_dir: the directory to store the output. data_transition: data input. model_factory: the factory with available algorithm factories. models: dictionary with api model configurations to compute.

ModelPipelineConfig( output_dir: str, data_transition: src.fairreckitlib.data.data_transition.DataTransition, model_factory: src.fairreckitlib.core.config.config_factories.GroupFactory, models: Dict[str, List[src.fairreckitlib.model.pipeline.model_config.ModelConfig]])
def run_model_pipelines( pipeline_config: src.fairreckitlib.model.pipeline.model_run.ModelPipelineConfig, event_dispatcher: src.fairreckitlib.core.events.event_dispatcher.EventDispatcher, is_running: Callable[[], bool], **kwargs) -> List[str]:
43def run_model_pipelines(
44        pipeline_config: ModelPipelineConfig,
45        event_dispatcher: EventDispatcher,
46        is_running: Callable[[], bool],
47        **kwargs) -> List[str]:
48    """Run several model pipelines according to the specified model pipeline configuration.
49
50    Args:
51        pipeline_config: the configuration on how to run the model pipelines.
52        event_dispatcher: used to dispatch model/IO events when running the model pipelines.
53        is_running: function that returns whether the pipelines
54            are still running. Stops early when False is returned.
55
56    Keyword Args:
57        num_threads(int): the max number of threads a model can use.
58        num_items(int): the number of item recommendations to produce, only
59            needed when running recommender pipelines.
60        rated_items_filter(bool): whether to filter already rated items when
61            producing item recommendations.
62
63    Returns:
64        list of directories where the computed model ratings are stored.
65    """
66    model_dirs = []
67
68    for api_name, models in pipeline_config.models.items():
69        api_factory = pipeline_config.model_factory.get_factory(api_name)
70        if api_factory is None:
71            event_dispatcher.dispatch(ErrorEventArgs(
72                ON_FAILURE_ERROR,
73                'Failure: to get algorithm API factory: ' + api_name
74            ))
75            continue
76
77        model_pipeline = api_factory.create_pipeline(
78            api_factory,
79            pipeline_config.data_transition,
80            event_dispatcher
81        )
82
83        try:
84            dirs = model_pipeline.run(
85                pipeline_config.output_dir,
86                models,
87                is_running,
88                **kwargs
89            )
90        except FileNotFoundError:
91            continue
92
93        model_dirs += dirs
94
95        if not is_running():
96            return model_dirs
97
98    return model_dirs

Run several model pipelines according to the specified model pipeline configuration.

Args: pipeline_config: the configuration on how to run the model pipelines. event_dispatcher: used to dispatch model/IO events when running the model pipelines. is_running: function that returns whether the pipelines are still running. Stops early when False is returned.

Keyword Args: num_threads(int): the max number of threads a model can use. num_items(int): the number of item recommendations to produce, only needed when running recommender pipelines. rated_items_filter(bool): whether to filter already rated items when producing item recommendations.

Returns: list of directories where the computed model ratings are stored.