src.fairreckitlib.model.pipeline.model_pipeline

This module contains base functionality of the complete model pipeline.

Classes:

ModelPipeline: class that batches multiple model computations for a specific API.

This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)

  1"""This module contains base functionality of the complete model pipeline.
  2
  3Classes:
  4
  5    ModelPipeline: class that batches multiple model computations for a specific API.
  6
  7This program has been developed by students from the bachelor Computer Science at
  8Utrecht University within the Software Project course.
  9© Copyright Utrecht University (Department of Information and Computing Sciences)
 10"""
 11
 12from abc import ABCMeta, abstractmethod
 13import os
 14import time
 15from typing import Any, Callable, Dict, List, Tuple
 16
 17import pandas as pd
 18
 19from ...core.config.config_factories import Factory
 20from ...core.core_constants import MODEL_RATINGS_FILE, MODEL_USER_BATCH_SIZE
 21from ...core.events.event_dispatcher import EventDispatcher
 22from ...core.events.event_error import ON_FAILURE_ERROR, ON_RAISE_ERROR, ErrorEventArgs
 23from ...core.io.event_io import DataframeEventArgs, FileEventArgs
 24from ...core.io.io_create import create_dir, create_json
 25from ...core.io.io_delete import delete_dir
 26from ...core.pipeline.core_pipeline import CorePipeline
 27from ...data.data_transition import DataTransition
 28from ..algorithms.base_algorithm import BaseAlgorithm
 29from ..algorithms.matrix import Matrix
 30from .model_config import ModelConfig
 31from .model_event import ON_BEGIN_LOAD_TEST_SET, ON_END_LOAD_TEST_SET
 32from .model_event import ON_BEGIN_LOAD_TRAIN_SET, ON_END_LOAD_TRAIN_SET
 33from .model_event import ON_BEGIN_MODEL_PIPELINE, ON_END_MODEL_PIPELINE
 34from .model_event import ON_BEGIN_RECONSTRUCT_RATINGS, ON_END_RECONSTRUCT_RATINGS
 35from .model_event import ON_BEGIN_TEST_MODEL, ON_END_TEST_MODEL
 36from .model_event import ON_BEGIN_TRAIN_MODEL, ON_END_TRAIN_MODEL
 37from .model_event import ON_BEGIN_MODEL, ON_END_MODEL
 38from .model_event import ModelPipelineEventArgs, ModelEventArgs
 39
 40
 41class ModelPipeline(CorePipeline, metaclass=ABCMeta):
 42    """Model Pipeline to run computations for algorithms from a specific API.
 43
 44    Wraps the common functionality that applies to all models disregarding the type.
 45    Loading the train and test is only done once each time the pipeline is run.
 46    After the previously mentioned sets are done loading, the pipeline loops
 47    through all specified models and executes the following steps:
 48
 49    1) create the output directory.
 50    2) create the model.
 51    3) save the model's creation settings.
 52    4) train the model using the train set.
 53    5) test the model using the test set.
 54
 55    After all models are trained and tested the computed rating files are updated
 56    with the original ratings from the train and test set.
 57
 58    Abstract methods:
 59
 60    load_test_set_users
 61    test_model_ratings
 62
 63    Public methods:
 64
 65    run
 66    """
 67
 68    def __init__(
 69            self,
 70            algo_factory: Factory,
 71            data_transition: DataTransition,
 72            event_dispatcher: EventDispatcher):
 73        """Construct the model pipeline.
 74
 75        Args:
 76            algo_factory: factory of available algorithms for this API.
 77            data_transition: data input.
 78            event_dispatcher: used to dispatch model/IO events when running the pipeline.
 79        """
 80        CorePipeline.__init__(self, event_dispatcher)
 81        self.algo_factory = algo_factory
 82        self.data_transition = data_transition
 83        self.tested_models = {}
 84
 85        self.train_set_matrix = None
 86        self.test_set_users = None
 87
 88    def run(self,
 89            output_dir: str,
 90            models_config: List[ModelConfig],
 91            is_running: Callable[[], bool],
 92            **kwargs) -> List[str]:
 93        """Run the entire pipeline from beginning to end.
 94
 95        Effectively running all computations of the specified models.
 96
 97        Args:
 98            output_dir: the path of the directory to store the output.
 99            models_config: list of ModelConfig objects to compute.
100            is_running: function that returns whether the pipeline
101                is still running. Stops early when False is returned.
102
103        Keyword Args:
104            num_threads(int): the max number of threads an algorithm can use.
105            num_items(int): the number of item recommendations to produce, only
106                needed when running the pipeline for recommender algorithms.
107            rated_items_filter(bool): whether to filter already rated items when
108                producing item recommendations.
109
110        Raises:
111            FileNotFoundError: when either the train and/or test fails to load.
112
113        Returns:
114            a list of model directories where computation results are stored.
115        """
116        result_dirs = []
117        if not is_running():
118            return result_dirs
119
120        self.event_dispatcher.dispatch(ModelPipelineEventArgs(
121            ON_BEGIN_MODEL_PIPELINE,
122            self.algo_factory.get_name(),
123            models_config
124        ))
125
126        start = time.time()
127
128        # this can raise a FileNotFoundError, effectively aborting the pipeline
129        self.load_train_set_matrix()
130        if not is_running():
131            return result_dirs
132
133        # this can raise a FileNotFoundError, effectively aborting the pipeline
134        self.load_test_set_users()
135        if not is_running():
136            return result_dirs
137
138        for model in models_config:
139            # verify that the specified model is available
140            if not self.algo_factory.is_obj_available(model.name):
141                self.event_dispatcher.dispatch(ErrorEventArgs(
142                    ON_FAILURE_ERROR,
143                    'Failure: algorithm is not available: ' +
144                    self.algo_factory.get_name() + ' ' + model.name
145                ))
146                continue
147
148            # create model output dir
149            model_dir = self.create_model_output_dir(
150                output_dir,
151                model.name
152            )
153
154            # attempt to run the model computation
155            try:
156                self.run_model(
157                    model_dir,
158                    model,
159                    is_running,
160                    **kwargs
161                )
162            except ArithmeticError:
163                self.event_dispatcher.dispatch(ErrorEventArgs(
164                    ON_RAISE_ERROR,
165                    'ArithmeticError: trying to run model ' +
166                    self.algo_factory.get_name() + ' ' + model.name
167                ))
168                delete_dir(model_dir, self.event_dispatcher)
169                continue
170            except MemoryError:
171                self.event_dispatcher.dispatch(ErrorEventArgs(
172                    ON_RAISE_ERROR,
173                    'MemoryError: trying to run model ' +
174                    self.algo_factory.get_name() + ' ' + model.name
175                ))
176                delete_dir(model_dir, self.event_dispatcher)
177                continue
178            except RuntimeError:
179                self.event_dispatcher.dispatch(ErrorEventArgs(
180                    ON_RAISE_ERROR,
181                    'RuntimeError: trying to run model ' +
182                    self.algo_factory.get_name() + ' ' + model.name
183                ))
184                delete_dir(model_dir, self.event_dispatcher)
185                continue
186
187            result_dirs.append(model_dir)
188            if not is_running():
189                return result_dirs
190
191        # free up some memory because everything is trained and tested
192        self.train_set_matrix = None
193        self.test_set_users = None
194
195        self.reconstruct_ratings(result_dirs, is_running)
196
197        end = time.time()
198
199        self.event_dispatcher.dispatch(ModelPipelineEventArgs(
200            ON_END_MODEL_PIPELINE,
201            self.algo_factory.get_name(),
202            models_config
203        ), elapsed_time=end - start)
204
205        return result_dirs
206
207    def run_model(
208            self,
209            model_dir: str,
210            model_config: ModelConfig,
211            is_running: Callable[[], bool],
212            **kwargs) -> None:
213        """Run the model computation for the specified model configuration.
214
215        Args:
216            model_dir: the path of the directory where the computed ratings can be stored.
217            model_config: the algorithm model configuration.
218            is_running: function that returns whether the pipeline
219                is still running. Stops early when False is returned.
220
221        Keyword Args:
222            num_threads(int): the max number of threads an algorithm can use.
223            num_items(int): the number of item recommendations to produce, only
224                needed when running the pipeline for recommender algorithms.
225            rated_items_filter(bool): whether to filter already rated items when
226                producing item recommendations.
227
228        Raises:
229            ArithmeticError: possibly raised by a model on construction, training or testing.
230            MemoryError: possibly raised by a model on construction, training or testing.
231            RuntimeError: possibly raised by a model on construction, training or testing.
232        """
233        model, start = self.begin_model(
234            model_config.name,
235            model_config.params,
236            model_dir,
237            **kwargs
238        )
239        if not is_running():
240            return
241
242        self.train_and_test_model(model, model_dir, is_running, **kwargs)
243        if not is_running():
244            return
245
246        self.end_model(model, start)
247
248    def begin_model(
249            self,
250            model_name: str,
251            model_params: Dict[str, Any],
252            model_dir: str,
253            **kwargs) -> Tuple[BaseAlgorithm, float]:
254        """Prepare the model computation.
255
256        Resolves the output directory to create for the model computation,
257        so that it is unique and creates the model.
258
259        Args:
260            model_name: name of the model's algorithm.
261            model_params: parameters of the algorithm.
262            model_dir: the path of the directory where the computed ratings can be stored.
263
264        Keyword Args:
265            num_threads(int): the max number of threads an algorithm can use.
266            rated_items_filter(bool): whether to filter already rated items when
267                producing item recommendations.
268
269        Raises:
270            ArithmeticError: possibly raised by a model on construction.
271            MemoryError: possibly raised by a model on construction.
272            RuntimeError: possibly raised by a model on construction.
273
274        Returns:
275            model: the created model according the specified name and parameters.
276            start: the time when the model computation started.
277        """
278        start = time.time()
279
280        self.event_dispatcher.dispatch(ModelEventArgs(
281            ON_BEGIN_MODEL,
282            model_name,
283            model_params
284        ))
285
286        # attempt to create model
287        kwargs['rating_type'] = self.data_transition.get_rating_type()
288        model = self.algo_factory.create(
289            model_name,
290            model_params,
291            **kwargs
292        )
293
294        # create settings file
295        create_json(
296            os.path.join(model_dir, 'settings.json'),
297            model.get_params(),
298            self.event_dispatcher,
299            indent=4
300        )
301
302        return model, start
303
304    def create_model_output_dir(self, output_dir: str, model_name: str) -> str:
305        """Create the output directory for a model.
306
307        Args:
308            output_dir: the path of the directory to store the output.
309            model_name: name of the model's algorithm.
310
311        Returns:
312            the path of the directory where the model's computed ratings can be stored.
313        """
314        if self.tested_models.get(model_name) is None:
315            # initialize model name counter
316            self.tested_models[model_name] = 0
317
318        return create_dir(self.get_model_output_dir(output_dir, model_name), self.event_dispatcher)
319
320    def get_model_output_dir(self, output_dir: str, model_name: str) -> str:
321        """Get the model output directory path for the specified model name.
322
323        Args:
324            output_dir: the path of the directory to store the output.
325            model_name: name of the model's algorithm.
326
327        Returns:
328            the path of the directory where the model's computed ratings can be stored.
329        """
330        index = self.tested_models[model_name]
331        return os.path.join(
332            output_dir,
333            self.algo_factory.get_name() + '_' + model_name + '_' + str(index)
334        )
335
336    def end_model(self, model: BaseAlgorithm, start: float) -> None:
337        """Finalize the model computation.
338
339        Updates the number of tested models so that additional
340        computations remain unique for this model.
341
342        Args:
343            model: the model that finished.
344            start: the time when the model computation started.
345        """
346        self.tested_models[model.get_name()] += 1
347
348        end = time.time()
349
350        self.event_dispatcher.dispatch(ModelEventArgs(
351            ON_END_MODEL,
352            model.get_name(),
353            model.get_params()
354        ), elapsed_time=end - start)
355
356    def on_load_train_set_matrix(self) -> Matrix:
357        """Load the train set matrix that all models can use for training.
358
359        The default train set matrix of the model pipeline is a dataframe.
360        Derived classes are allowed to override this function to return a different type of matrix.
361
362        Returns:
363            the loaded train set matrix dataframe.
364        """
365        return Matrix(self.data_transition.train_set_path)
366
367    def load_train_set_matrix(self) -> None:
368        """Load the train set matrix that all models can use for training.
369
370        Raises:
371            FileNotFoundError: when the train set file is not found.
372        """
373        self.event_dispatcher.dispatch(DataframeEventArgs(
374            ON_BEGIN_LOAD_TRAIN_SET,
375            self.data_transition.train_set_path,
376            'model train set matrix'
377        ))
378
379        start = time.time()
380
381        try:
382            self.train_set_matrix = self.on_load_train_set_matrix()
383        except FileNotFoundError as err:
384            self.event_dispatcher.dispatch(ErrorEventArgs(
385                ON_RAISE_ERROR,
386                'FileNotFoundError: raised while trying to load the matrix train set from ' +
387                self.data_transition.train_set_path
388            ))
389            raise err
390
391
392        end = time.time()
393
394        self.event_dispatcher.dispatch(DataframeEventArgs(
395            ON_END_LOAD_TRAIN_SET,
396            self.data_transition.train_set_path,
397            'model train set matrix'
398        ), elapsed_time=end - start)
399
400    def load_train_set_dataframe(self) -> pd.DataFrame:
401        """Load the train set as a dataframe.
402
403        Raises:
404            FileNotFoundError: when the train set file is not found.
405
406        Returns:
407            the loaded train set dataframe.
408        """
409        return self.read_dataframe(
410            self.data_transition.train_set_path,
411            'data train set',
412            ON_BEGIN_LOAD_TRAIN_SET,
413            ON_END_LOAD_TRAIN_SET,
414            names=['user', 'item', 'rating']
415        )
416
417    def load_test_set_dataframe(self, test_name: str='data test set') -> pd.DataFrame:
418        """Load the test set as a dataframe.
419
420        Args:
421            test_name: name of the test set dataframe to dispatch in the dataframe event.
422
423        Raises:
424            FileNotFoundError: when the test set file is not found.
425
426        Returns:
427            the loaded test set dataframe.
428        """
429        return self.read_dataframe(
430            self.data_transition.test_set_path,
431            test_name,
432            ON_BEGIN_LOAD_TEST_SET,
433            ON_END_LOAD_TEST_SET,
434            names=['user', 'item', 'rating']
435        )
436
437    @abstractmethod
438    def load_test_set_users(self) -> None:
439        """Load the test set users that all models can use for testing.
440
441        Raises:
442            FileNotFoundError: when the test set file is not found.
443        """
444        raise NotImplementedError()
445
446    def reconstruct_ratings(
447            self,
448            result_dirs: List[str],
449            is_running: Callable[[], bool]) -> None:
450        """Reconstruct the original ratings for all the computed models ratings.
451
452        Args:
453            result_dirs: a list of directories that contain a computed rating file.
454            is_running: function that returns whether the pipeline
455                is still running. Stops early when False is returned.
456        """
457        if not is_running() or len(result_dirs) == 0:
458            return
459
460        # TODO should probably move this code to a separate pipeline
461        ratings_dataframe = pd.concat([
462            self.load_train_set_dataframe(),
463            self.load_test_set_dataframe()
464        ])
465
466        for model_dir in result_dirs:
467            if not is_running():
468                return
469
470            result_file_path = os.path.join(model_dir, MODEL_RATINGS_FILE)
471
472            self.event_dispatcher.dispatch(FileEventArgs(
473                ON_BEGIN_RECONSTRUCT_RATINGS,
474                result_file_path
475            ))
476
477            start = time.time()
478
479            result = pd.read_csv(result_file_path, sep='\t')
480            result = pd.merge(result, ratings_dataframe, how='left', on=['user', 'item'])
481            result.to_csv(result_file_path, sep='\t', header=True, index=False)
482
483            end = time.time()
484
485            self.event_dispatcher.dispatch(FileEventArgs(
486                ON_END_RECONSTRUCT_RATINGS,
487                result_file_path
488            ), elapsed_time=end - start)
489
490    def test_model(
491            self,
492            model: BaseAlgorithm,
493            model_dir: str,
494            is_running: Callable[[], bool],
495            **kwargs) -> None:
496        """Test the specified model using the test set.
497
498        This function wraps the event dispatching and functionality
499        that both predictor and recommender models have in common.
500
501        Args:
502            model: the model that needs to be tested.
503            model_dir: the path of the directory where the computed ratings can be stored.
504            is_running: function that returns whether the pipeline
505                is still running. Stops early when False is returned.
506
507        Keyword Args:
508            num_items(int): the number of item recommendations to produce, only
509                needed when running the pipeline for recommender algorithms.
510
511        Raises:
512            ArithmeticError: possibly raised by a model on testing.
513            MemoryError: possibly raised by a model on testing.
514            RuntimeError: possibly raised by a model on testing.
515        """
516        self.event_dispatcher.dispatch(ModelEventArgs(
517            ON_BEGIN_TEST_MODEL,
518            model.get_name(),
519            model.get_params()
520        ))
521
522        start = time.time()
523
524        result_file_path = os.path.join(model_dir, MODEL_RATINGS_FILE)
525        start_index = 0
526        while start_index < len(self.test_set_users):
527            if not is_running():
528                return
529
530            user_batch = self.test_set_users[start_index : start_index + MODEL_USER_BATCH_SIZE]
531            ratings = self.test_model_ratings(model, user_batch, **kwargs)
532            if not is_running():
533                return
534
535            self.write_dataframe(result_file_path, ratings, start_index == 0)
536            start_index += MODEL_USER_BATCH_SIZE
537
538        end = time.time()
539
540        self.event_dispatcher.dispatch(ModelEventArgs(
541            ON_END_TEST_MODEL,
542            model.get_name(),
543            model.get_params()
544        ), elapsed_time=end - start)
545
546    @abstractmethod
547    def test_model_ratings(
548            self,
549            model: BaseAlgorithm,
550            user_batch: List[int],
551            **kwargs) -> pd.DataFrame:
552        """Test the specified model for rating predictions or recommendations.
553
554        Args:
555            model: the model that needs to be tested.
556            user_batch: the user batch to compute model ratings for.
557
558        Keyword Args:
559            num_items(int): the number of item recommendations to produce, only
560                needed when running the pipeline for recommender algorithms.
561
562        Raises:
563            ArithmeticError: possibly raised by a model on testing.
564            MemoryError: possibly raised by a model on testing.
565            RuntimeError: possibly raised by a model on testing.
566
567        Returns:
568            a dataframe containing the computed model ratings.
569        """
570        raise NotImplementedError()
571
572    def train_model(self, model: BaseAlgorithm) -> None:
573        """Train the specified model using the train set.
574
575        Args:
576            model: the model that needs to be trained.
577
578        Raises:
579            ArithmeticError: possibly raised by a model on training.
580            MemoryError: possibly raised by a model on training.
581            RuntimeError: possibly raised by a model on training.
582        """
583        self.event_dispatcher.dispatch(ModelEventArgs(
584            ON_BEGIN_TRAIN_MODEL,
585            model.get_name(),
586            model.get_params()
587        ))
588
589        start = time.time()
590
591        model.train(self.train_set_matrix)
592
593        end = time.time()
594
595        self.event_dispatcher.dispatch(ModelEventArgs(
596            ON_END_TRAIN_MODEL,
597            model.get_name(),
598            model.get_params()
599        ), elapsed_time=end - start)
600
601    def train_and_test_model(
602            self,
603            model: BaseAlgorithm,
604            model_dir: str,
605            is_running: Callable[[], bool],
606            **kwargs) -> None:
607        """Train and test the specified model.
608
609        Several possible errors can be raised during the executing of both training and
610        testing the model: namely ArithmeticError, MemoryError and RuntimeError.
611
612        Args:
613            model: the model that needs to be trained.
614            model_dir: the path of the directory where the computed ratings can be stored.
615            is_running: function that returns whether the pipeline
616                is still running. Stops early when False is returned.
617
618        Raises:
619            ArithmeticError: possibly raised by a model on training or testing.
620            MemoryError: possibly raised by a model on training or testing.
621            RuntimeError: possibly raised by a model on training or testing.
622
623        Keyword Args:
624            num_items(int): the number of item recommendations to produce, only
625                needed when running the pipeline for recommender algorithms.
626        """
627        try:
628            self.train_model(model)
629        except (ArithmeticError, MemoryError, RuntimeError) as err:
630            self.event_dispatcher.dispatch(ErrorEventArgs(
631                ON_RAISE_ERROR,
632                'Error: raised while training model ' +
633                self.algo_factory.get_name() + ' ' + model.get_name()
634            ))
635            # raise again so the model run aborts
636            raise err
637
638        try:
639            self.test_model(model, model_dir, is_running, **kwargs)
640        except (ArithmeticError, MemoryError, RuntimeError) as err:
641            self.event_dispatcher.dispatch(ErrorEventArgs(
642                ON_RAISE_ERROR,
643                'Error: raised while testing model ' +
644                self.algo_factory.get_name() + ' ' + model.get_name()
645            ))
646            # raise again so the model run aborts
647            raise err
 42class ModelPipeline(CorePipeline, metaclass=ABCMeta):
 43    """Model Pipeline to run computations for algorithms from a specific API.
 44
 45    Wraps the common functionality that applies to all models disregarding the type.
 46    Loading the train and test is only done once each time the pipeline is run.
 47    After the previously mentioned sets are done loading, the pipeline loops
 48    through all specified models and executes the following steps:
 49
 50    1) create the output directory.
 51    2) create the model.
 52    3) save the model's creation settings.
 53    4) train the model using the train set.
 54    5) test the model using the test set.
 55
 56    After all models are trained and tested the computed rating files are updated
 57    with the original ratings from the train and test set.
 58
 59    Abstract methods:
 60
 61    load_test_set_users
 62    test_model_ratings
 63
 64    Public methods:
 65
 66    run
 67    """
 68
 69    def __init__(
 70            self,
 71            algo_factory: Factory,
 72            data_transition: DataTransition,
 73            event_dispatcher: EventDispatcher):
 74        """Construct the model pipeline.
 75
 76        Args:
 77            algo_factory: factory of available algorithms for this API.
 78            data_transition: data input.
 79            event_dispatcher: used to dispatch model/IO events when running the pipeline.
 80        """
 81        CorePipeline.__init__(self, event_dispatcher)
 82        self.algo_factory = algo_factory
 83        self.data_transition = data_transition
 84        self.tested_models = {}
 85
 86        self.train_set_matrix = None
 87        self.test_set_users = None
 88
 89    def run(self,
 90            output_dir: str,
 91            models_config: List[ModelConfig],
 92            is_running: Callable[[], bool],
 93            **kwargs) -> List[str]:
 94        """Run the entire pipeline from beginning to end.
 95
 96        Effectively running all computations of the specified models.
 97
 98        Args:
 99            output_dir: the path of the directory to store the output.
100            models_config: list of ModelConfig objects to compute.
101            is_running: function that returns whether the pipeline
102                is still running. Stops early when False is returned.
103
104        Keyword Args:
105            num_threads(int): the max number of threads an algorithm can use.
106            num_items(int): the number of item recommendations to produce, only
107                needed when running the pipeline for recommender algorithms.
108            rated_items_filter(bool): whether to filter already rated items when
109                producing item recommendations.
110
111        Raises:
112            FileNotFoundError: when either the train and/or test fails to load.
113
114        Returns:
115            a list of model directories where computation results are stored.
116        """
117        result_dirs = []
118        if not is_running():
119            return result_dirs
120
121        self.event_dispatcher.dispatch(ModelPipelineEventArgs(
122            ON_BEGIN_MODEL_PIPELINE,
123            self.algo_factory.get_name(),
124            models_config
125        ))
126
127        start = time.time()
128
129        # this can raise a FileNotFoundError, effectively aborting the pipeline
130        self.load_train_set_matrix()
131        if not is_running():
132            return result_dirs
133
134        # this can raise a FileNotFoundError, effectively aborting the pipeline
135        self.load_test_set_users()
136        if not is_running():
137            return result_dirs
138
139        for model in models_config:
140            # verify that the specified model is available
141            if not self.algo_factory.is_obj_available(model.name):
142                self.event_dispatcher.dispatch(ErrorEventArgs(
143                    ON_FAILURE_ERROR,
144                    'Failure: algorithm is not available: ' +
145                    self.algo_factory.get_name() + ' ' + model.name
146                ))
147                continue
148
149            # create model output dir
150            model_dir = self.create_model_output_dir(
151                output_dir,
152                model.name
153            )
154
155            # attempt to run the model computation
156            try:
157                self.run_model(
158                    model_dir,
159                    model,
160                    is_running,
161                    **kwargs
162                )
163            except ArithmeticError:
164                self.event_dispatcher.dispatch(ErrorEventArgs(
165                    ON_RAISE_ERROR,
166                    'ArithmeticError: trying to run model ' +
167                    self.algo_factory.get_name() + ' ' + model.name
168                ))
169                delete_dir(model_dir, self.event_dispatcher)
170                continue
171            except MemoryError:
172                self.event_dispatcher.dispatch(ErrorEventArgs(
173                    ON_RAISE_ERROR,
174                    'MemoryError: trying to run model ' +
175                    self.algo_factory.get_name() + ' ' + model.name
176                ))
177                delete_dir(model_dir, self.event_dispatcher)
178                continue
179            except RuntimeError:
180                self.event_dispatcher.dispatch(ErrorEventArgs(
181                    ON_RAISE_ERROR,
182                    'RuntimeError: trying to run model ' +
183                    self.algo_factory.get_name() + ' ' + model.name
184                ))
185                delete_dir(model_dir, self.event_dispatcher)
186                continue
187
188            result_dirs.append(model_dir)
189            if not is_running():
190                return result_dirs
191
192        # free up some memory because everything is trained and tested
193        self.train_set_matrix = None
194        self.test_set_users = None
195
196        self.reconstruct_ratings(result_dirs, is_running)
197
198        end = time.time()
199
200        self.event_dispatcher.dispatch(ModelPipelineEventArgs(
201            ON_END_MODEL_PIPELINE,
202            self.algo_factory.get_name(),
203            models_config
204        ), elapsed_time=end - start)
205
206        return result_dirs
207
208    def run_model(
209            self,
210            model_dir: str,
211            model_config: ModelConfig,
212            is_running: Callable[[], bool],
213            **kwargs) -> None:
214        """Run the model computation for the specified model configuration.
215
216        Args:
217            model_dir: the path of the directory where the computed ratings can be stored.
218            model_config: the algorithm model configuration.
219            is_running: function that returns whether the pipeline
220                is still running. Stops early when False is returned.
221
222        Keyword Args:
223            num_threads(int): the max number of threads an algorithm can use.
224            num_items(int): the number of item recommendations to produce, only
225                needed when running the pipeline for recommender algorithms.
226            rated_items_filter(bool): whether to filter already rated items when
227                producing item recommendations.
228
229        Raises:
230            ArithmeticError: possibly raised by a model on construction, training or testing.
231            MemoryError: possibly raised by a model on construction, training or testing.
232            RuntimeError: possibly raised by a model on construction, training or testing.
233        """
234        model, start = self.begin_model(
235            model_config.name,
236            model_config.params,
237            model_dir,
238            **kwargs
239        )
240        if not is_running():
241            return
242
243        self.train_and_test_model(model, model_dir, is_running, **kwargs)
244        if not is_running():
245            return
246
247        self.end_model(model, start)
248
249    def begin_model(
250            self,
251            model_name: str,
252            model_params: Dict[str, Any],
253            model_dir: str,
254            **kwargs) -> Tuple[BaseAlgorithm, float]:
255        """Prepare the model computation.
256
257        Resolves the output directory to create for the model computation,
258        so that it is unique and creates the model.
259
260        Args:
261            model_name: name of the model's algorithm.
262            model_params: parameters of the algorithm.
263            model_dir: the path of the directory where the computed ratings can be stored.
264
265        Keyword Args:
266            num_threads(int): the max number of threads an algorithm can use.
267            rated_items_filter(bool): whether to filter already rated items when
268                producing item recommendations.
269
270        Raises:
271            ArithmeticError: possibly raised by a model on construction.
272            MemoryError: possibly raised by a model on construction.
273            RuntimeError: possibly raised by a model on construction.
274
275        Returns:
276            model: the created model according the specified name and parameters.
277            start: the time when the model computation started.
278        """
279        start = time.time()
280
281        self.event_dispatcher.dispatch(ModelEventArgs(
282            ON_BEGIN_MODEL,
283            model_name,
284            model_params
285        ))
286
287        # attempt to create model
288        kwargs['rating_type'] = self.data_transition.get_rating_type()
289        model = self.algo_factory.create(
290            model_name,
291            model_params,
292            **kwargs
293        )
294
295        # create settings file
296        create_json(
297            os.path.join(model_dir, 'settings.json'),
298            model.get_params(),
299            self.event_dispatcher,
300            indent=4
301        )
302
303        return model, start
304
305    def create_model_output_dir(self, output_dir: str, model_name: str) -> str:
306        """Create the output directory for a model.
307
308        Args:
309            output_dir: the path of the directory to store the output.
310            model_name: name of the model's algorithm.
311
312        Returns:
313            the path of the directory where the model's computed ratings can be stored.
314        """
315        if self.tested_models.get(model_name) is None:
316            # initialize model name counter
317            self.tested_models[model_name] = 0
318
319        return create_dir(self.get_model_output_dir(output_dir, model_name), self.event_dispatcher)
320
321    def get_model_output_dir(self, output_dir: str, model_name: str) -> str:
322        """Get the model output directory path for the specified model name.
323
324        Args:
325            output_dir: the path of the directory to store the output.
326            model_name: name of the model's algorithm.
327
328        Returns:
329            the path of the directory where the model's computed ratings can be stored.
330        """
331        index = self.tested_models[model_name]
332        return os.path.join(
333            output_dir,
334            self.algo_factory.get_name() + '_' + model_name + '_' + str(index)
335        )
336
337    def end_model(self, model: BaseAlgorithm, start: float) -> None:
338        """Finalize the model computation.
339
340        Updates the number of tested models so that additional
341        computations remain unique for this model.
342
343        Args:
344            model: the model that finished.
345            start: the time when the model computation started.
346        """
347        self.tested_models[model.get_name()] += 1
348
349        end = time.time()
350
351        self.event_dispatcher.dispatch(ModelEventArgs(
352            ON_END_MODEL,
353            model.get_name(),
354            model.get_params()
355        ), elapsed_time=end - start)
356
357    def on_load_train_set_matrix(self) -> Matrix:
358        """Load the train set matrix that all models can use for training.
359
360        The default train set matrix of the model pipeline is a dataframe.
361        Derived classes are allowed to override this function to return a different type of matrix.
362
363        Returns:
364            the loaded train set matrix dataframe.
365        """
366        return Matrix(self.data_transition.train_set_path)
367
368    def load_train_set_matrix(self) -> None:
369        """Load the train set matrix that all models can use for training.
370
371        Raises:
372            FileNotFoundError: when the train set file is not found.
373        """
374        self.event_dispatcher.dispatch(DataframeEventArgs(
375            ON_BEGIN_LOAD_TRAIN_SET,
376            self.data_transition.train_set_path,
377            'model train set matrix'
378        ))
379
380        start = time.time()
381
382        try:
383            self.train_set_matrix = self.on_load_train_set_matrix()
384        except FileNotFoundError as err:
385            self.event_dispatcher.dispatch(ErrorEventArgs(
386                ON_RAISE_ERROR,
387                'FileNotFoundError: raised while trying to load the matrix train set from ' +
388                self.data_transition.train_set_path
389            ))
390            raise err
391
392
393        end = time.time()
394
395        self.event_dispatcher.dispatch(DataframeEventArgs(
396            ON_END_LOAD_TRAIN_SET,
397            self.data_transition.train_set_path,
398            'model train set matrix'
399        ), elapsed_time=end - start)
400
401    def load_train_set_dataframe(self) -> pd.DataFrame:
402        """Load the train set as a dataframe.
403
404        Raises:
405            FileNotFoundError: when the train set file is not found.
406
407        Returns:
408            the loaded train set dataframe.
409        """
410        return self.read_dataframe(
411            self.data_transition.train_set_path,
412            'data train set',
413            ON_BEGIN_LOAD_TRAIN_SET,
414            ON_END_LOAD_TRAIN_SET,
415            names=['user', 'item', 'rating']
416        )
417
418    def load_test_set_dataframe(self, test_name: str='data test set') -> pd.DataFrame:
419        """Load the test set as a dataframe.
420
421        Args:
422            test_name: name of the test set dataframe to dispatch in the dataframe event.
423
424        Raises:
425            FileNotFoundError: when the test set file is not found.
426
427        Returns:
428            the loaded test set dataframe.
429        """
430        return self.read_dataframe(
431            self.data_transition.test_set_path,
432            test_name,
433            ON_BEGIN_LOAD_TEST_SET,
434            ON_END_LOAD_TEST_SET,
435            names=['user', 'item', 'rating']
436        )
437
438    @abstractmethod
439    def load_test_set_users(self) -> None:
440        """Load the test set users that all models can use for testing.
441
442        Raises:
443            FileNotFoundError: when the test set file is not found.
444        """
445        raise NotImplementedError()
446
447    def reconstruct_ratings(
448            self,
449            result_dirs: List[str],
450            is_running: Callable[[], bool]) -> None:
451        """Reconstruct the original ratings for all the computed models ratings.
452
453        Args:
454            result_dirs: a list of directories that contain a computed rating file.
455            is_running: function that returns whether the pipeline
456                is still running. Stops early when False is returned.
457        """
458        if not is_running() or len(result_dirs) == 0:
459            return
460
461        # TODO should probably move this code to a separate pipeline
462        ratings_dataframe = pd.concat([
463            self.load_train_set_dataframe(),
464            self.load_test_set_dataframe()
465        ])
466
467        for model_dir in result_dirs:
468            if not is_running():
469                return
470
471            result_file_path = os.path.join(model_dir, MODEL_RATINGS_FILE)
472
473            self.event_dispatcher.dispatch(FileEventArgs(
474                ON_BEGIN_RECONSTRUCT_RATINGS,
475                result_file_path
476            ))
477
478            start = time.time()
479
480            result = pd.read_csv(result_file_path, sep='\t')
481            result = pd.merge(result, ratings_dataframe, how='left', on=['user', 'item'])
482            result.to_csv(result_file_path, sep='\t', header=True, index=False)
483
484            end = time.time()
485
486            self.event_dispatcher.dispatch(FileEventArgs(
487                ON_END_RECONSTRUCT_RATINGS,
488                result_file_path
489            ), elapsed_time=end - start)
490
491    def test_model(
492            self,
493            model: BaseAlgorithm,
494            model_dir: str,
495            is_running: Callable[[], bool],
496            **kwargs) -> None:
497        """Test the specified model using the test set.
498
499        This function wraps the event dispatching and functionality
500        that both predictor and recommender models have in common.
501
502        Args:
503            model: the model that needs to be tested.
504            model_dir: the path of the directory where the computed ratings can be stored.
505            is_running: function that returns whether the pipeline
506                is still running. Stops early when False is returned.
507
508        Keyword Args:
509            num_items(int): the number of item recommendations to produce, only
510                needed when running the pipeline for recommender algorithms.
511
512        Raises:
513            ArithmeticError: possibly raised by a model on testing.
514            MemoryError: possibly raised by a model on testing.
515            RuntimeError: possibly raised by a model on testing.
516        """
517        self.event_dispatcher.dispatch(ModelEventArgs(
518            ON_BEGIN_TEST_MODEL,
519            model.get_name(),
520            model.get_params()
521        ))
522
523        start = time.time()
524
525        result_file_path = os.path.join(model_dir, MODEL_RATINGS_FILE)
526        start_index = 0
527        while start_index < len(self.test_set_users):
528            if not is_running():
529                return
530
531            user_batch = self.test_set_users[start_index : start_index + MODEL_USER_BATCH_SIZE]
532            ratings = self.test_model_ratings(model, user_batch, **kwargs)
533            if not is_running():
534                return
535
536            self.write_dataframe(result_file_path, ratings, start_index == 0)
537            start_index += MODEL_USER_BATCH_SIZE
538
539        end = time.time()
540
541        self.event_dispatcher.dispatch(ModelEventArgs(
542            ON_END_TEST_MODEL,
543            model.get_name(),
544            model.get_params()
545        ), elapsed_time=end - start)
546
547    @abstractmethod
548    def test_model_ratings(
549            self,
550            model: BaseAlgorithm,
551            user_batch: List[int],
552            **kwargs) -> pd.DataFrame:
553        """Test the specified model for rating predictions or recommendations.
554
555        Args:
556            model: the model that needs to be tested.
557            user_batch: the user batch to compute model ratings for.
558
559        Keyword Args:
560            num_items(int): the number of item recommendations to produce, only
561                needed when running the pipeline for recommender algorithms.
562
563        Raises:
564            ArithmeticError: possibly raised by a model on testing.
565            MemoryError: possibly raised by a model on testing.
566            RuntimeError: possibly raised by a model on testing.
567
568        Returns:
569            a dataframe containing the computed model ratings.
570        """
571        raise NotImplementedError()
572
573    def train_model(self, model: BaseAlgorithm) -> None:
574        """Train the specified model using the train set.
575
576        Args:
577            model: the model that needs to be trained.
578
579        Raises:
580            ArithmeticError: possibly raised by a model on training.
581            MemoryError: possibly raised by a model on training.
582            RuntimeError: possibly raised by a model on training.
583        """
584        self.event_dispatcher.dispatch(ModelEventArgs(
585            ON_BEGIN_TRAIN_MODEL,
586            model.get_name(),
587            model.get_params()
588        ))
589
590        start = time.time()
591
592        model.train(self.train_set_matrix)
593
594        end = time.time()
595
596        self.event_dispatcher.dispatch(ModelEventArgs(
597            ON_END_TRAIN_MODEL,
598            model.get_name(),
599            model.get_params()
600        ), elapsed_time=end - start)
601
602    def train_and_test_model(
603            self,
604            model: BaseAlgorithm,
605            model_dir: str,
606            is_running: Callable[[], bool],
607            **kwargs) -> None:
608        """Train and test the specified model.
609
610        Several possible errors can be raised during the executing of both training and
611        testing the model: namely ArithmeticError, MemoryError and RuntimeError.
612
613        Args:
614            model: the model that needs to be trained.
615            model_dir: the path of the directory where the computed ratings can be stored.
616            is_running: function that returns whether the pipeline
617                is still running. Stops early when False is returned.
618
619        Raises:
620            ArithmeticError: possibly raised by a model on training or testing.
621            MemoryError: possibly raised by a model on training or testing.
622            RuntimeError: possibly raised by a model on training or testing.
623
624        Keyword Args:
625            num_items(int): the number of item recommendations to produce, only
626                needed when running the pipeline for recommender algorithms.
627        """
628        try:
629            self.train_model(model)
630        except (ArithmeticError, MemoryError, RuntimeError) as err:
631            self.event_dispatcher.dispatch(ErrorEventArgs(
632                ON_RAISE_ERROR,
633                'Error: raised while training model ' +
634                self.algo_factory.get_name() + ' ' + model.get_name()
635            ))
636            # raise again so the model run aborts
637            raise err
638
639        try:
640            self.test_model(model, model_dir, is_running, **kwargs)
641        except (ArithmeticError, MemoryError, RuntimeError) as err:
642            self.event_dispatcher.dispatch(ErrorEventArgs(
643                ON_RAISE_ERROR,
644                'Error: raised while testing model ' +
645                self.algo_factory.get_name() + ' ' + model.get_name()
646            ))
647            # raise again so the model run aborts
648            raise err

Model Pipeline to run computations for algorithms from a specific API.

Wraps the common functionality that applies to all models disregarding the type. Loading the train and test is only done once each time the pipeline is run. After the previously mentioned sets are done loading, the pipeline loops through all specified models and executes the following steps:

1) create the output directory. 2) create the model. 3) save the model's creation settings. 4) train the model using the train set. 5) test the model using the test set.

After all models are trained and tested the computed rating files are updated with the original ratings from the train and test set.

Abstract methods:

load_test_set_users test_model_ratings

Public methods:

run

ModelPipeline( algo_factory: src.fairreckitlib.core.config.config_factories.Factory, data_transition: src.fairreckitlib.data.data_transition.DataTransition, event_dispatcher: src.fairreckitlib.core.events.event_dispatcher.EventDispatcher)
69    def __init__(
70            self,
71            algo_factory: Factory,
72            data_transition: DataTransition,
73            event_dispatcher: EventDispatcher):
74        """Construct the model pipeline.
75
76        Args:
77            algo_factory: factory of available algorithms for this API.
78            data_transition: data input.
79            event_dispatcher: used to dispatch model/IO events when running the pipeline.
80        """
81        CorePipeline.__init__(self, event_dispatcher)
82        self.algo_factory = algo_factory
83        self.data_transition = data_transition
84        self.tested_models = {}
85
86        self.train_set_matrix = None
87        self.test_set_users = None

Construct the model pipeline.

Args: algo_factory: factory of available algorithms for this API. data_transition: data input. event_dispatcher: used to dispatch model/IO events when running the pipeline.

def run( self, output_dir: str, models_config: List[src.fairreckitlib.model.pipeline.model_config.ModelConfig], is_running: Callable[[], bool], **kwargs) -> List[str]:
 89    def run(self,
 90            output_dir: str,
 91            models_config: List[ModelConfig],
 92            is_running: Callable[[], bool],
 93            **kwargs) -> List[str]:
 94        """Run the entire pipeline from beginning to end.
 95
 96        Effectively running all computations of the specified models.
 97
 98        Args:
 99            output_dir: the path of the directory to store the output.
100            models_config: list of ModelConfig objects to compute.
101            is_running: function that returns whether the pipeline
102                is still running. Stops early when False is returned.
103
104        Keyword Args:
105            num_threads(int): the max number of threads an algorithm can use.
106            num_items(int): the number of item recommendations to produce, only
107                needed when running the pipeline for recommender algorithms.
108            rated_items_filter(bool): whether to filter already rated items when
109                producing item recommendations.
110
111        Raises:
112            FileNotFoundError: when either the train and/or test fails to load.
113
114        Returns:
115            a list of model directories where computation results are stored.
116        """
117        result_dirs = []
118        if not is_running():
119            return result_dirs
120
121        self.event_dispatcher.dispatch(ModelPipelineEventArgs(
122            ON_BEGIN_MODEL_PIPELINE,
123            self.algo_factory.get_name(),
124            models_config
125        ))
126
127        start = time.time()
128
129        # this can raise a FileNotFoundError, effectively aborting the pipeline
130        self.load_train_set_matrix()
131        if not is_running():
132            return result_dirs
133
134        # this can raise a FileNotFoundError, effectively aborting the pipeline
135        self.load_test_set_users()
136        if not is_running():
137            return result_dirs
138
139        for model in models_config:
140            # verify that the specified model is available
141            if not self.algo_factory.is_obj_available(model.name):
142                self.event_dispatcher.dispatch(ErrorEventArgs(
143                    ON_FAILURE_ERROR,
144                    'Failure: algorithm is not available: ' +
145                    self.algo_factory.get_name() + ' ' + model.name
146                ))
147                continue
148
149            # create model output dir
150            model_dir = self.create_model_output_dir(
151                output_dir,
152                model.name
153            )
154
155            # attempt to run the model computation
156            try:
157                self.run_model(
158                    model_dir,
159                    model,
160                    is_running,
161                    **kwargs
162                )
163            except ArithmeticError:
164                self.event_dispatcher.dispatch(ErrorEventArgs(
165                    ON_RAISE_ERROR,
166                    'ArithmeticError: trying to run model ' +
167                    self.algo_factory.get_name() + ' ' + model.name
168                ))
169                delete_dir(model_dir, self.event_dispatcher)
170                continue
171            except MemoryError:
172                self.event_dispatcher.dispatch(ErrorEventArgs(
173                    ON_RAISE_ERROR,
174                    'MemoryError: trying to run model ' +
175                    self.algo_factory.get_name() + ' ' + model.name
176                ))
177                delete_dir(model_dir, self.event_dispatcher)
178                continue
179            except RuntimeError:
180                self.event_dispatcher.dispatch(ErrorEventArgs(
181                    ON_RAISE_ERROR,
182                    'RuntimeError: trying to run model ' +
183                    self.algo_factory.get_name() + ' ' + model.name
184                ))
185                delete_dir(model_dir, self.event_dispatcher)
186                continue
187
188            result_dirs.append(model_dir)
189            if not is_running():
190                return result_dirs
191
192        # free up some memory because everything is trained and tested
193        self.train_set_matrix = None
194        self.test_set_users = None
195
196        self.reconstruct_ratings(result_dirs, is_running)
197
198        end = time.time()
199
200        self.event_dispatcher.dispatch(ModelPipelineEventArgs(
201            ON_END_MODEL_PIPELINE,
202            self.algo_factory.get_name(),
203            models_config
204        ), elapsed_time=end - start)
205
206        return result_dirs

Run the entire pipeline from beginning to end.

Effectively running all computations of the specified models.

Args: output_dir: the path of the directory to store the output. models_config: list of ModelConfig objects to compute. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.

Keyword Args: num_threads(int): the max number of threads an algorithm can use. num_items(int): the number of item recommendations to produce, only needed when running the pipeline for recommender algorithms. rated_items_filter(bool): whether to filter already rated items when producing item recommendations.

Raises: FileNotFoundError: when either the train and/or test fails to load.

Returns: a list of model directories where computation results are stored.

def run_model( self, model_dir: str, model_config: src.fairreckitlib.model.pipeline.model_config.ModelConfig, is_running: Callable[[], bool], **kwargs) -> None:
208    def run_model(
209            self,
210            model_dir: str,
211            model_config: ModelConfig,
212            is_running: Callable[[], bool],
213            **kwargs) -> None:
214        """Run the model computation for the specified model configuration.
215
216        Args:
217            model_dir: the path of the directory where the computed ratings can be stored.
218            model_config: the algorithm model configuration.
219            is_running: function that returns whether the pipeline
220                is still running. Stops early when False is returned.
221
222        Keyword Args:
223            num_threads(int): the max number of threads an algorithm can use.
224            num_items(int): the number of item recommendations to produce, only
225                needed when running the pipeline for recommender algorithms.
226            rated_items_filter(bool): whether to filter already rated items when
227                producing item recommendations.
228
229        Raises:
230            ArithmeticError: possibly raised by a model on construction, training or testing.
231            MemoryError: possibly raised by a model on construction, training or testing.
232            RuntimeError: possibly raised by a model on construction, training or testing.
233        """
234        model, start = self.begin_model(
235            model_config.name,
236            model_config.params,
237            model_dir,
238            **kwargs
239        )
240        if not is_running():
241            return
242
243        self.train_and_test_model(model, model_dir, is_running, **kwargs)
244        if not is_running():
245            return
246
247        self.end_model(model, start)

Run the model computation for the specified model configuration.

Args: model_dir: the path of the directory where the computed ratings can be stored. model_config: the algorithm model configuration. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.

Keyword Args: num_threads(int): the max number of threads an algorithm can use. num_items(int): the number of item recommendations to produce, only needed when running the pipeline for recommender algorithms. rated_items_filter(bool): whether to filter already rated items when producing item recommendations.

Raises: ArithmeticError: possibly raised by a model on construction, training or testing. MemoryError: possibly raised by a model on construction, training or testing. RuntimeError: possibly raised by a model on construction, training or testing.

def begin_model( self, model_name: str, model_params: Dict[str, Any], model_dir: str, **kwargs) -> Tuple[src.fairreckitlib.model.algorithms.base_algorithm.BaseAlgorithm, float]:
249    def begin_model(
250            self,
251            model_name: str,
252            model_params: Dict[str, Any],
253            model_dir: str,
254            **kwargs) -> Tuple[BaseAlgorithm, float]:
255        """Prepare the model computation.
256
257        Resolves the output directory to create for the model computation,
258        so that it is unique and creates the model.
259
260        Args:
261            model_name: name of the model's algorithm.
262            model_params: parameters of the algorithm.
263            model_dir: the path of the directory where the computed ratings can be stored.
264
265        Keyword Args:
266            num_threads(int): the max number of threads an algorithm can use.
267            rated_items_filter(bool): whether to filter already rated items when
268                producing item recommendations.
269
270        Raises:
271            ArithmeticError: possibly raised by a model on construction.
272            MemoryError: possibly raised by a model on construction.
273            RuntimeError: possibly raised by a model on construction.
274
275        Returns:
276            model: the created model according the specified name and parameters.
277            start: the time when the model computation started.
278        """
279        start = time.time()
280
281        self.event_dispatcher.dispatch(ModelEventArgs(
282            ON_BEGIN_MODEL,
283            model_name,
284            model_params
285        ))
286
287        # attempt to create model
288        kwargs['rating_type'] = self.data_transition.get_rating_type()
289        model = self.algo_factory.create(
290            model_name,
291            model_params,
292            **kwargs
293        )
294
295        # create settings file
296        create_json(
297            os.path.join(model_dir, 'settings.json'),
298            model.get_params(),
299            self.event_dispatcher,
300            indent=4
301        )
302
303        return model, start

Prepare the model computation.

Resolves the output directory to create for the model computation, so that it is unique and creates the model.

Args: model_name: name of the model's algorithm. model_params: parameters of the algorithm. model_dir: the path of the directory where the computed ratings can be stored.

Keyword Args: num_threads(int): the max number of threads an algorithm can use. rated_items_filter(bool): whether to filter already rated items when producing item recommendations.

Raises: ArithmeticError: possibly raised by a model on construction. MemoryError: possibly raised by a model on construction. RuntimeError: possibly raised by a model on construction.

Returns: model: the created model according the specified name and parameters. start: the time when the model computation started.

def create_model_output_dir(self, output_dir: str, model_name: str) -> str:
305    def create_model_output_dir(self, output_dir: str, model_name: str) -> str:
306        """Create the output directory for a model.
307
308        Args:
309            output_dir: the path of the directory to store the output.
310            model_name: name of the model's algorithm.
311
312        Returns:
313            the path of the directory where the model's computed ratings can be stored.
314        """
315        if self.tested_models.get(model_name) is None:
316            # initialize model name counter
317            self.tested_models[model_name] = 0
318
319        return create_dir(self.get_model_output_dir(output_dir, model_name), self.event_dispatcher)

Create the output directory for a model.

Args: output_dir: the path of the directory to store the output. model_name: name of the model's algorithm.

Returns: the path of the directory where the model's computed ratings can be stored.

def get_model_output_dir(self, output_dir: str, model_name: str) -> str:
321    def get_model_output_dir(self, output_dir: str, model_name: str) -> str:
322        """Get the model output directory path for the specified model name.
323
324        Args:
325            output_dir: the path of the directory to store the output.
326            model_name: name of the model's algorithm.
327
328        Returns:
329            the path of the directory where the model's computed ratings can be stored.
330        """
331        index = self.tested_models[model_name]
332        return os.path.join(
333            output_dir,
334            self.algo_factory.get_name() + '_' + model_name + '_' + str(index)
335        )

Get the model output directory path for the specified model name.

Args: output_dir: the path of the directory to store the output. model_name: name of the model's algorithm.

Returns: the path of the directory where the model's computed ratings can be stored.

def end_model( self, model: src.fairreckitlib.model.algorithms.base_algorithm.BaseAlgorithm, start: float) -> None:
337    def end_model(self, model: BaseAlgorithm, start: float) -> None:
338        """Finalize the model computation.
339
340        Updates the number of tested models so that additional
341        computations remain unique for this model.
342
343        Args:
344            model: the model that finished.
345            start: the time when the model computation started.
346        """
347        self.tested_models[model.get_name()] += 1
348
349        end = time.time()
350
351        self.event_dispatcher.dispatch(ModelEventArgs(
352            ON_END_MODEL,
353            model.get_name(),
354            model.get_params()
355        ), elapsed_time=end - start)

Finalize the model computation.

Updates the number of tested models so that additional computations remain unique for this model.

Args: model: the model that finished. start: the time when the model computation started.

def on_load_train_set_matrix(self) -> src.fairreckitlib.model.algorithms.matrix.Matrix:
357    def on_load_train_set_matrix(self) -> Matrix:
358        """Load the train set matrix that all models can use for training.
359
360        The default train set matrix of the model pipeline is a dataframe.
361        Derived classes are allowed to override this function to return a different type of matrix.
362
363        Returns:
364            the loaded train set matrix dataframe.
365        """
366        return Matrix(self.data_transition.train_set_path)

Load the train set matrix that all models can use for training.

The default train set matrix of the model pipeline is a dataframe. Derived classes are allowed to override this function to return a different type of matrix.

Returns: the loaded train set matrix dataframe.

def load_train_set_matrix(self) -> None:
368    def load_train_set_matrix(self) -> None:
369        """Load the train set matrix that all models can use for training.
370
371        Raises:
372            FileNotFoundError: when the train set file is not found.
373        """
374        self.event_dispatcher.dispatch(DataframeEventArgs(
375            ON_BEGIN_LOAD_TRAIN_SET,
376            self.data_transition.train_set_path,
377            'model train set matrix'
378        ))
379
380        start = time.time()
381
382        try:
383            self.train_set_matrix = self.on_load_train_set_matrix()
384        except FileNotFoundError as err:
385            self.event_dispatcher.dispatch(ErrorEventArgs(
386                ON_RAISE_ERROR,
387                'FileNotFoundError: raised while trying to load the matrix train set from ' +
388                self.data_transition.train_set_path
389            ))
390            raise err
391
392
393        end = time.time()
394
395        self.event_dispatcher.dispatch(DataframeEventArgs(
396            ON_END_LOAD_TRAIN_SET,
397            self.data_transition.train_set_path,
398            'model train set matrix'
399        ), elapsed_time=end - start)

Load the train set matrix that all models can use for training.

Raises: FileNotFoundError: when the train set file is not found.

def load_train_set_dataframe(self) -> pandas.core.frame.DataFrame:
401    def load_train_set_dataframe(self) -> pd.DataFrame:
402        """Load the train set as a dataframe.
403
404        Raises:
405            FileNotFoundError: when the train set file is not found.
406
407        Returns:
408            the loaded train set dataframe.
409        """
410        return self.read_dataframe(
411            self.data_transition.train_set_path,
412            'data train set',
413            ON_BEGIN_LOAD_TRAIN_SET,
414            ON_END_LOAD_TRAIN_SET,
415            names=['user', 'item', 'rating']
416        )

Load the train set as a dataframe.

Raises: FileNotFoundError: when the train set file is not found.

Returns: the loaded train set dataframe.

def load_test_set_dataframe(self, test_name: str = 'data test set') -> pandas.core.frame.DataFrame:
418    def load_test_set_dataframe(self, test_name: str='data test set') -> pd.DataFrame:
419        """Load the test set as a dataframe.
420
421        Args:
422            test_name: name of the test set dataframe to dispatch in the dataframe event.
423
424        Raises:
425            FileNotFoundError: when the test set file is not found.
426
427        Returns:
428            the loaded test set dataframe.
429        """
430        return self.read_dataframe(
431            self.data_transition.test_set_path,
432            test_name,
433            ON_BEGIN_LOAD_TEST_SET,
434            ON_END_LOAD_TEST_SET,
435            names=['user', 'item', 'rating']
436        )

Load the test set as a dataframe.

Args: test_name: name of the test set dataframe to dispatch in the dataframe event.

Raises: FileNotFoundError: when the test set file is not found.

Returns: the loaded test set dataframe.

@abstractmethod
def load_test_set_users(self) -> None:
438    @abstractmethod
439    def load_test_set_users(self) -> None:
440        """Load the test set users that all models can use for testing.
441
442        Raises:
443            FileNotFoundError: when the test set file is not found.
444        """
445        raise NotImplementedError()

Load the test set users that all models can use for testing.

Raises: FileNotFoundError: when the test set file is not found.

def reconstruct_ratings(self, result_dirs: List[str], is_running: Callable[[], bool]) -> None:
447    def reconstruct_ratings(
448            self,
449            result_dirs: List[str],
450            is_running: Callable[[], bool]) -> None:
451        """Reconstruct the original ratings for all the computed models ratings.
452
453        Args:
454            result_dirs: a list of directories that contain a computed rating file.
455            is_running: function that returns whether the pipeline
456                is still running. Stops early when False is returned.
457        """
458        if not is_running() or len(result_dirs) == 0:
459            return
460
461        # TODO should probably move this code to a separate pipeline
462        ratings_dataframe = pd.concat([
463            self.load_train_set_dataframe(),
464            self.load_test_set_dataframe()
465        ])
466
467        for model_dir in result_dirs:
468            if not is_running():
469                return
470
471            result_file_path = os.path.join(model_dir, MODEL_RATINGS_FILE)
472
473            self.event_dispatcher.dispatch(FileEventArgs(
474                ON_BEGIN_RECONSTRUCT_RATINGS,
475                result_file_path
476            ))
477
478            start = time.time()
479
480            result = pd.read_csv(result_file_path, sep='\t')
481            result = pd.merge(result, ratings_dataframe, how='left', on=['user', 'item'])
482            result.to_csv(result_file_path, sep='\t', header=True, index=False)
483
484            end = time.time()
485
486            self.event_dispatcher.dispatch(FileEventArgs(
487                ON_END_RECONSTRUCT_RATINGS,
488                result_file_path
489            ), elapsed_time=end - start)

Reconstruct the original ratings for all the computed models ratings.

Args: result_dirs: a list of directories that contain a computed rating file. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.

def test_model( self, model: src.fairreckitlib.model.algorithms.base_algorithm.BaseAlgorithm, model_dir: str, is_running: Callable[[], bool], **kwargs) -> None:
491    def test_model(
492            self,
493            model: BaseAlgorithm,
494            model_dir: str,
495            is_running: Callable[[], bool],
496            **kwargs) -> None:
497        """Test the specified model using the test set.
498
499        This function wraps the event dispatching and functionality
500        that both predictor and recommender models have in common.
501
502        Args:
503            model: the model that needs to be tested.
504            model_dir: the path of the directory where the computed ratings can be stored.
505            is_running: function that returns whether the pipeline
506                is still running. Stops early when False is returned.
507
508        Keyword Args:
509            num_items(int): the number of item recommendations to produce, only
510                needed when running the pipeline for recommender algorithms.
511
512        Raises:
513            ArithmeticError: possibly raised by a model on testing.
514            MemoryError: possibly raised by a model on testing.
515            RuntimeError: possibly raised by a model on testing.
516        """
517        self.event_dispatcher.dispatch(ModelEventArgs(
518            ON_BEGIN_TEST_MODEL,
519            model.get_name(),
520            model.get_params()
521        ))
522
523        start = time.time()
524
525        result_file_path = os.path.join(model_dir, MODEL_RATINGS_FILE)
526        start_index = 0
527        while start_index < len(self.test_set_users):
528            if not is_running():
529                return
530
531            user_batch = self.test_set_users[start_index : start_index + MODEL_USER_BATCH_SIZE]
532            ratings = self.test_model_ratings(model, user_batch, **kwargs)
533            if not is_running():
534                return
535
536            self.write_dataframe(result_file_path, ratings, start_index == 0)
537            start_index += MODEL_USER_BATCH_SIZE
538
539        end = time.time()
540
541        self.event_dispatcher.dispatch(ModelEventArgs(
542            ON_END_TEST_MODEL,
543            model.get_name(),
544            model.get_params()
545        ), elapsed_time=end - start)

Test the specified model using the test set.

This function wraps the event dispatching and functionality that both predictor and recommender models have in common.

Args: model: the model that needs to be tested. model_dir: the path of the directory where the computed ratings can be stored. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.

Keyword Args: num_items(int): the number of item recommendations to produce, only needed when running the pipeline for recommender algorithms.

Raises: ArithmeticError: possibly raised by a model on testing. MemoryError: possibly raised by a model on testing. RuntimeError: possibly raised by a model on testing.

@abstractmethod
def test_model_ratings( self, model: src.fairreckitlib.model.algorithms.base_algorithm.BaseAlgorithm, user_batch: List[int], **kwargs) -> pandas.core.frame.DataFrame:
547    @abstractmethod
548    def test_model_ratings(
549            self,
550            model: BaseAlgorithm,
551            user_batch: List[int],
552            **kwargs) -> pd.DataFrame:
553        """Test the specified model for rating predictions or recommendations.
554
555        Args:
556            model: the model that needs to be tested.
557            user_batch: the user batch to compute model ratings for.
558
559        Keyword Args:
560            num_items(int): the number of item recommendations to produce, only
561                needed when running the pipeline for recommender algorithms.
562
563        Raises:
564            ArithmeticError: possibly raised by a model on testing.
565            MemoryError: possibly raised by a model on testing.
566            RuntimeError: possibly raised by a model on testing.
567
568        Returns:
569            a dataframe containing the computed model ratings.
570        """
571        raise NotImplementedError()

Test the specified model for rating predictions or recommendations.

Args: model: the model that needs to be tested. user_batch: the user batch to compute model ratings for.

Keyword Args: num_items(int): the number of item recommendations to produce, only needed when running the pipeline for recommender algorithms.

Raises: ArithmeticError: possibly raised by a model on testing. MemoryError: possibly raised by a model on testing. RuntimeError: possibly raised by a model on testing.

Returns: a dataframe containing the computed model ratings.

def train_model( self, model: src.fairreckitlib.model.algorithms.base_algorithm.BaseAlgorithm) -> None:
573    def train_model(self, model: BaseAlgorithm) -> None:
574        """Train the specified model using the train set.
575
576        Args:
577            model: the model that needs to be trained.
578
579        Raises:
580            ArithmeticError: possibly raised by a model on training.
581            MemoryError: possibly raised by a model on training.
582            RuntimeError: possibly raised by a model on training.
583        """
584        self.event_dispatcher.dispatch(ModelEventArgs(
585            ON_BEGIN_TRAIN_MODEL,
586            model.get_name(),
587            model.get_params()
588        ))
589
590        start = time.time()
591
592        model.train(self.train_set_matrix)
593
594        end = time.time()
595
596        self.event_dispatcher.dispatch(ModelEventArgs(
597            ON_END_TRAIN_MODEL,
598            model.get_name(),
599            model.get_params()
600        ), elapsed_time=end - start)

Train the specified model using the train set.

Args: model: the model that needs to be trained.

Raises: ArithmeticError: possibly raised by a model on training. MemoryError: possibly raised by a model on training. RuntimeError: possibly raised by a model on training.

def train_and_test_model( self, model: src.fairreckitlib.model.algorithms.base_algorithm.BaseAlgorithm, model_dir: str, is_running: Callable[[], bool], **kwargs) -> None:
602    def train_and_test_model(
603            self,
604            model: BaseAlgorithm,
605            model_dir: str,
606            is_running: Callable[[], bool],
607            **kwargs) -> None:
608        """Train and test the specified model.
609
610        Several possible errors can be raised during the executing of both training and
611        testing the model: namely ArithmeticError, MemoryError and RuntimeError.
612
613        Args:
614            model: the model that needs to be trained.
615            model_dir: the path of the directory where the computed ratings can be stored.
616            is_running: function that returns whether the pipeline
617                is still running. Stops early when False is returned.
618
619        Raises:
620            ArithmeticError: possibly raised by a model on training or testing.
621            MemoryError: possibly raised by a model on training or testing.
622            RuntimeError: possibly raised by a model on training or testing.
623
624        Keyword Args:
625            num_items(int): the number of item recommendations to produce, only
626                needed when running the pipeline for recommender algorithms.
627        """
628        try:
629            self.train_model(model)
630        except (ArithmeticError, MemoryError, RuntimeError) as err:
631            self.event_dispatcher.dispatch(ErrorEventArgs(
632                ON_RAISE_ERROR,
633                'Error: raised while training model ' +
634                self.algo_factory.get_name() + ' ' + model.get_name()
635            ))
636            # raise again so the model run aborts
637            raise err
638
639        try:
640            self.test_model(model, model_dir, is_running, **kwargs)
641        except (ArithmeticError, MemoryError, RuntimeError) as err:
642            self.event_dispatcher.dispatch(ErrorEventArgs(
643                ON_RAISE_ERROR,
644                'Error: raised while testing model ' +
645                self.algo_factory.get_name() + ' ' + model.get_name()
646            ))
647            # raise again so the model run aborts
648            raise err

Train and test the specified model.

Several possible errors can be raised during the executing of both training and testing the model: namely ArithmeticError, MemoryError and RuntimeError.

Args: model: the model that needs to be trained. model_dir: the path of the directory where the computed ratings can be stored. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.

Raises: ArithmeticError: possibly raised by a model on training or testing. MemoryError: possibly raised by a model on training or testing. RuntimeError: possibly raised by a model on training or testing.

Keyword Args: num_items(int): the number of item recommendations to produce, only needed when running the pipeline for recommender algorithms.