src.fairreckitlib.model.pipeline.model_run
This module contains functionality that wraps running the model pipeline multiple times.
Classes:
ModelPipelineConfig: configuration class to run the model pipelines.
Functions:
run_model_pipelines: run (multiple) pipelines for specified model configurations.
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)
1"""This module contains functionality that wraps running the model pipeline multiple times. 2 3Classes: 4 5 ModelPipelineConfig: configuration class to run the model pipelines. 6 7Functions: 8 9 run_model_pipelines: run (multiple) pipelines for specified model configurations. 10 11This program has been developed by students from the bachelor Computer Science at 12Utrecht University within the Software Project course. 13© Copyright Utrecht University (Department of Information and Computing Sciences) 14""" 15 16from dataclasses import dataclass 17from typing import Callable, Dict, List 18 19from ...core.config.config_factories import GroupFactory 20from ...core.events.event_dispatcher import EventDispatcher 21from ...core.events.event_error import ON_FAILURE_ERROR, ErrorEventArgs 22from ...data.data_transition import DataTransition 23from .model_config import ModelConfig 24 25 26@dataclass 27class ModelPipelineConfig: 28 """Model Pipeline Configuration. 29 30 output_dir: the directory to store the output. 31 data_transition: data input. 32 model_factory: the factory with available algorithm factories. 33 models: dictionary with api model configurations to compute. 34 """ 35 36 output_dir: str 37 data_transition: DataTransition 38 model_factory: GroupFactory 39 models: Dict[str, List[ModelConfig]] 40 41 42def run_model_pipelines( 43 pipeline_config: ModelPipelineConfig, 44 event_dispatcher: EventDispatcher, 45 is_running: Callable[[], bool], 46 **kwargs) -> List[str]: 47 """Run several model pipelines according to the specified model pipeline configuration. 48 49 Args: 50 pipeline_config: the configuration on how to run the model pipelines. 51 event_dispatcher: used to dispatch model/IO events when running the model pipelines. 52 is_running: function that returns whether the pipelines 53 are still running. Stops early when False is returned. 54 55 Keyword Args: 56 num_threads(int): the max number of threads a model can use. 57 num_items(int): the number of item recommendations to produce, only 58 needed when running recommender pipelines. 59 rated_items_filter(bool): whether to filter already rated items when 60 producing item recommendations. 61 62 Returns: 63 list of directories where the computed model ratings are stored. 64 """ 65 model_dirs = [] 66 67 for api_name, models in pipeline_config.models.items(): 68 api_factory = pipeline_config.model_factory.get_factory(api_name) 69 if api_factory is None: 70 event_dispatcher.dispatch(ErrorEventArgs( 71 ON_FAILURE_ERROR, 72 'Failure: to get algorithm API factory: ' + api_name 73 )) 74 continue 75 76 model_pipeline = api_factory.create_pipeline( 77 api_factory, 78 pipeline_config.data_transition, 79 event_dispatcher 80 ) 81 82 try: 83 dirs = model_pipeline.run( 84 pipeline_config.output_dir, 85 models, 86 is_running, 87 **kwargs 88 ) 89 except FileNotFoundError: 90 continue 91 92 model_dirs += dirs 93 94 if not is_running(): 95 return model_dirs 96 97 return model_dirs
27@dataclass 28class ModelPipelineConfig: 29 """Model Pipeline Configuration. 30 31 output_dir: the directory to store the output. 32 data_transition: data input. 33 model_factory: the factory with available algorithm factories. 34 models: dictionary with api model configurations to compute. 35 """ 36 37 output_dir: str 38 data_transition: DataTransition 39 model_factory: GroupFactory 40 models: Dict[str, List[ModelConfig]]
Model Pipeline Configuration.
output_dir: the directory to store the output. data_transition: data input. model_factory: the factory with available algorithm factories. models: dictionary with api model configurations to compute.
43def run_model_pipelines( 44 pipeline_config: ModelPipelineConfig, 45 event_dispatcher: EventDispatcher, 46 is_running: Callable[[], bool], 47 **kwargs) -> List[str]: 48 """Run several model pipelines according to the specified model pipeline configuration. 49 50 Args: 51 pipeline_config: the configuration on how to run the model pipelines. 52 event_dispatcher: used to dispatch model/IO events when running the model pipelines. 53 is_running: function that returns whether the pipelines 54 are still running. Stops early when False is returned. 55 56 Keyword Args: 57 num_threads(int): the max number of threads a model can use. 58 num_items(int): the number of item recommendations to produce, only 59 needed when running recommender pipelines. 60 rated_items_filter(bool): whether to filter already rated items when 61 producing item recommendations. 62 63 Returns: 64 list of directories where the computed model ratings are stored. 65 """ 66 model_dirs = [] 67 68 for api_name, models in pipeline_config.models.items(): 69 api_factory = pipeline_config.model_factory.get_factory(api_name) 70 if api_factory is None: 71 event_dispatcher.dispatch(ErrorEventArgs( 72 ON_FAILURE_ERROR, 73 'Failure: to get algorithm API factory: ' + api_name 74 )) 75 continue 76 77 model_pipeline = api_factory.create_pipeline( 78 api_factory, 79 pipeline_config.data_transition, 80 event_dispatcher 81 ) 82 83 try: 84 dirs = model_pipeline.run( 85 pipeline_config.output_dir, 86 models, 87 is_running, 88 **kwargs 89 ) 90 except FileNotFoundError: 91 continue 92 93 model_dirs += dirs 94 95 if not is_running(): 96 return model_dirs 97 98 return model_dirs
Run several model pipelines according to the specified model pipeline configuration.
Args: pipeline_config: the configuration on how to run the model pipelines. event_dispatcher: used to dispatch model/IO events when running the model pipelines. is_running: function that returns whether the pipelines are still running. Stops early when False is returned.
Keyword Args: num_threads(int): the max number of threads a model can use. num_items(int): the number of item recommendations to produce, only needed when running recommender pipelines. rated_items_filter(bool): whether to filter already rated items when producing item recommendations.
Returns: list of directories where the computed model ratings are stored.