src.fairreckitlib.model.pipeline.model_pipeline
This module contains base functionality of the complete model pipeline.
Classes:
ModelPipeline: class that batches multiple model computations for a specific API.
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)
1"""This module contains base functionality of the complete model pipeline. 2 3Classes: 4 5 ModelPipeline: class that batches multiple model computations for a specific API. 6 7This program has been developed by students from the bachelor Computer Science at 8Utrecht University within the Software Project course. 9© Copyright Utrecht University (Department of Information and Computing Sciences) 10""" 11 12from abc import ABCMeta, abstractmethod 13import os 14import time 15from typing import Any, Callable, Dict, List, Tuple 16 17import pandas as pd 18 19from ...core.config.config_factories import Factory 20from ...core.core_constants import MODEL_RATINGS_FILE, MODEL_USER_BATCH_SIZE 21from ...core.events.event_dispatcher import EventDispatcher 22from ...core.events.event_error import ON_FAILURE_ERROR, ON_RAISE_ERROR, ErrorEventArgs 23from ...core.io.event_io import DataframeEventArgs, FileEventArgs 24from ...core.io.io_create import create_dir, create_json 25from ...core.io.io_delete import delete_dir 26from ...core.pipeline.core_pipeline import CorePipeline 27from ...data.data_transition import DataTransition 28from ..algorithms.base_algorithm import BaseAlgorithm 29from ..algorithms.matrix import Matrix 30from .model_config import ModelConfig 31from .model_event import ON_BEGIN_LOAD_TEST_SET, ON_END_LOAD_TEST_SET 32from .model_event import ON_BEGIN_LOAD_TRAIN_SET, ON_END_LOAD_TRAIN_SET 33from .model_event import ON_BEGIN_MODEL_PIPELINE, ON_END_MODEL_PIPELINE 34from .model_event import ON_BEGIN_RECONSTRUCT_RATINGS, ON_END_RECONSTRUCT_RATINGS 35from .model_event import ON_BEGIN_TEST_MODEL, ON_END_TEST_MODEL 36from .model_event import ON_BEGIN_TRAIN_MODEL, ON_END_TRAIN_MODEL 37from .model_event import ON_BEGIN_MODEL, ON_END_MODEL 38from .model_event import ModelPipelineEventArgs, ModelEventArgs 39 40 41class ModelPipeline(CorePipeline, metaclass=ABCMeta): 42 """Model Pipeline to run computations for algorithms from a specific API. 43 44 Wraps the common functionality that applies to all models disregarding the type. 45 Loading the train and test is only done once each time the pipeline is run. 46 After the previously mentioned sets are done loading, the pipeline loops 47 through all specified models and executes the following steps: 48 49 1) create the output directory. 50 2) create the model. 51 3) save the model's creation settings. 52 4) train the model using the train set. 53 5) test the model using the test set. 54 55 After all models are trained and tested the computed rating files are updated 56 with the original ratings from the train and test set. 57 58 Abstract methods: 59 60 load_test_set_users 61 test_model_ratings 62 63 Public methods: 64 65 run 66 """ 67 68 def __init__( 69 self, 70 algo_factory: Factory, 71 data_transition: DataTransition, 72 event_dispatcher: EventDispatcher): 73 """Construct the model pipeline. 74 75 Args: 76 algo_factory: factory of available algorithms for this API. 77 data_transition: data input. 78 event_dispatcher: used to dispatch model/IO events when running the pipeline. 79 """ 80 CorePipeline.__init__(self, event_dispatcher) 81 self.algo_factory = algo_factory 82 self.data_transition = data_transition 83 self.tested_models = {} 84 85 self.train_set_matrix = None 86 self.test_set_users = None 87 88 def run(self, 89 output_dir: str, 90 models_config: List[ModelConfig], 91 is_running: Callable[[], bool], 92 **kwargs) -> List[str]: 93 """Run the entire pipeline from beginning to end. 94 95 Effectively running all computations of the specified models. 96 97 Args: 98 output_dir: the path of the directory to store the output. 99 models_config: list of ModelConfig objects to compute. 100 is_running: function that returns whether the pipeline 101 is still running. Stops early when False is returned. 102 103 Keyword Args: 104 num_threads(int): the max number of threads an algorithm can use. 105 num_items(int): the number of item recommendations to produce, only 106 needed when running the pipeline for recommender algorithms. 107 rated_items_filter(bool): whether to filter already rated items when 108 producing item recommendations. 109 110 Raises: 111 FileNotFoundError: when either the train and/or test fails to load. 112 113 Returns: 114 a list of model directories where computation results are stored. 115 """ 116 result_dirs = [] 117 if not is_running(): 118 return result_dirs 119 120 self.event_dispatcher.dispatch(ModelPipelineEventArgs( 121 ON_BEGIN_MODEL_PIPELINE, 122 self.algo_factory.get_name(), 123 models_config 124 )) 125 126 start = time.time() 127 128 # this can raise a FileNotFoundError, effectively aborting the pipeline 129 self.load_train_set_matrix() 130 if not is_running(): 131 return result_dirs 132 133 # this can raise a FileNotFoundError, effectively aborting the pipeline 134 self.load_test_set_users() 135 if not is_running(): 136 return result_dirs 137 138 for model in models_config: 139 # verify that the specified model is available 140 if not self.algo_factory.is_obj_available(model.name): 141 self.event_dispatcher.dispatch(ErrorEventArgs( 142 ON_FAILURE_ERROR, 143 'Failure: algorithm is not available: ' + 144 self.algo_factory.get_name() + ' ' + model.name 145 )) 146 continue 147 148 # create model output dir 149 model_dir = self.create_model_output_dir( 150 output_dir, 151 model.name 152 ) 153 154 # attempt to run the model computation 155 try: 156 self.run_model( 157 model_dir, 158 model, 159 is_running, 160 **kwargs 161 ) 162 except ArithmeticError: 163 self.event_dispatcher.dispatch(ErrorEventArgs( 164 ON_RAISE_ERROR, 165 'ArithmeticError: trying to run model ' + 166 self.algo_factory.get_name() + ' ' + model.name 167 )) 168 delete_dir(model_dir, self.event_dispatcher) 169 continue 170 except MemoryError: 171 self.event_dispatcher.dispatch(ErrorEventArgs( 172 ON_RAISE_ERROR, 173 'MemoryError: trying to run model ' + 174 self.algo_factory.get_name() + ' ' + model.name 175 )) 176 delete_dir(model_dir, self.event_dispatcher) 177 continue 178 except RuntimeError: 179 self.event_dispatcher.dispatch(ErrorEventArgs( 180 ON_RAISE_ERROR, 181 'RuntimeError: trying to run model ' + 182 self.algo_factory.get_name() + ' ' + model.name 183 )) 184 delete_dir(model_dir, self.event_dispatcher) 185 continue 186 187 result_dirs.append(model_dir) 188 if not is_running(): 189 return result_dirs 190 191 # free up some memory because everything is trained and tested 192 self.train_set_matrix = None 193 self.test_set_users = None 194 195 self.reconstruct_ratings(result_dirs, is_running) 196 197 end = time.time() 198 199 self.event_dispatcher.dispatch(ModelPipelineEventArgs( 200 ON_END_MODEL_PIPELINE, 201 self.algo_factory.get_name(), 202 models_config 203 ), elapsed_time=end - start) 204 205 return result_dirs 206 207 def run_model( 208 self, 209 model_dir: str, 210 model_config: ModelConfig, 211 is_running: Callable[[], bool], 212 **kwargs) -> None: 213 """Run the model computation for the specified model configuration. 214 215 Args: 216 model_dir: the path of the directory where the computed ratings can be stored. 217 model_config: the algorithm model configuration. 218 is_running: function that returns whether the pipeline 219 is still running. Stops early when False is returned. 220 221 Keyword Args: 222 num_threads(int): the max number of threads an algorithm can use. 223 num_items(int): the number of item recommendations to produce, only 224 needed when running the pipeline for recommender algorithms. 225 rated_items_filter(bool): whether to filter already rated items when 226 producing item recommendations. 227 228 Raises: 229 ArithmeticError: possibly raised by a model on construction, training or testing. 230 MemoryError: possibly raised by a model on construction, training or testing. 231 RuntimeError: possibly raised by a model on construction, training or testing. 232 """ 233 model, start = self.begin_model( 234 model_config.name, 235 model_config.params, 236 model_dir, 237 **kwargs 238 ) 239 if not is_running(): 240 return 241 242 self.train_and_test_model(model, model_dir, is_running, **kwargs) 243 if not is_running(): 244 return 245 246 self.end_model(model, start) 247 248 def begin_model( 249 self, 250 model_name: str, 251 model_params: Dict[str, Any], 252 model_dir: str, 253 **kwargs) -> Tuple[BaseAlgorithm, float]: 254 """Prepare the model computation. 255 256 Resolves the output directory to create for the model computation, 257 so that it is unique and creates the model. 258 259 Args: 260 model_name: name of the model's algorithm. 261 model_params: parameters of the algorithm. 262 model_dir: the path of the directory where the computed ratings can be stored. 263 264 Keyword Args: 265 num_threads(int): the max number of threads an algorithm can use. 266 rated_items_filter(bool): whether to filter already rated items when 267 producing item recommendations. 268 269 Raises: 270 ArithmeticError: possibly raised by a model on construction. 271 MemoryError: possibly raised by a model on construction. 272 RuntimeError: possibly raised by a model on construction. 273 274 Returns: 275 model: the created model according the specified name and parameters. 276 start: the time when the model computation started. 277 """ 278 start = time.time() 279 280 self.event_dispatcher.dispatch(ModelEventArgs( 281 ON_BEGIN_MODEL, 282 model_name, 283 model_params 284 )) 285 286 # attempt to create model 287 kwargs['rating_type'] = self.data_transition.get_rating_type() 288 model = self.algo_factory.create( 289 model_name, 290 model_params, 291 **kwargs 292 ) 293 294 # create settings file 295 create_json( 296 os.path.join(model_dir, 'settings.json'), 297 model.get_params(), 298 self.event_dispatcher, 299 indent=4 300 ) 301 302 return model, start 303 304 def create_model_output_dir(self, output_dir: str, model_name: str) -> str: 305 """Create the output directory for a model. 306 307 Args: 308 output_dir: the path of the directory to store the output. 309 model_name: name of the model's algorithm. 310 311 Returns: 312 the path of the directory where the model's computed ratings can be stored. 313 """ 314 if self.tested_models.get(model_name) is None: 315 # initialize model name counter 316 self.tested_models[model_name] = 0 317 318 return create_dir(self.get_model_output_dir(output_dir, model_name), self.event_dispatcher) 319 320 def get_model_output_dir(self, output_dir: str, model_name: str) -> str: 321 """Get the model output directory path for the specified model name. 322 323 Args: 324 output_dir: the path of the directory to store the output. 325 model_name: name of the model's algorithm. 326 327 Returns: 328 the path of the directory where the model's computed ratings can be stored. 329 """ 330 index = self.tested_models[model_name] 331 return os.path.join( 332 output_dir, 333 self.algo_factory.get_name() + '_' + model_name + '_' + str(index) 334 ) 335 336 def end_model(self, model: BaseAlgorithm, start: float) -> None: 337 """Finalize the model computation. 338 339 Updates the number of tested models so that additional 340 computations remain unique for this model. 341 342 Args: 343 model: the model that finished. 344 start: the time when the model computation started. 345 """ 346 self.tested_models[model.get_name()] += 1 347 348 end = time.time() 349 350 self.event_dispatcher.dispatch(ModelEventArgs( 351 ON_END_MODEL, 352 model.get_name(), 353 model.get_params() 354 ), elapsed_time=end - start) 355 356 def on_load_train_set_matrix(self) -> Matrix: 357 """Load the train set matrix that all models can use for training. 358 359 The default train set matrix of the model pipeline is a dataframe. 360 Derived classes are allowed to override this function to return a different type of matrix. 361 362 Returns: 363 the loaded train set matrix dataframe. 364 """ 365 return Matrix(self.data_transition.train_set_path) 366 367 def load_train_set_matrix(self) -> None: 368 """Load the train set matrix that all models can use for training. 369 370 Raises: 371 FileNotFoundError: when the train set file is not found. 372 """ 373 self.event_dispatcher.dispatch(DataframeEventArgs( 374 ON_BEGIN_LOAD_TRAIN_SET, 375 self.data_transition.train_set_path, 376 'model train set matrix' 377 )) 378 379 start = time.time() 380 381 try: 382 self.train_set_matrix = self.on_load_train_set_matrix() 383 except FileNotFoundError as err: 384 self.event_dispatcher.dispatch(ErrorEventArgs( 385 ON_RAISE_ERROR, 386 'FileNotFoundError: raised while trying to load the matrix train set from ' + 387 self.data_transition.train_set_path 388 )) 389 raise err 390 391 392 end = time.time() 393 394 self.event_dispatcher.dispatch(DataframeEventArgs( 395 ON_END_LOAD_TRAIN_SET, 396 self.data_transition.train_set_path, 397 'model train set matrix' 398 ), elapsed_time=end - start) 399 400 def load_train_set_dataframe(self) -> pd.DataFrame: 401 """Load the train set as a dataframe. 402 403 Raises: 404 FileNotFoundError: when the train set file is not found. 405 406 Returns: 407 the loaded train set dataframe. 408 """ 409 return self.read_dataframe( 410 self.data_transition.train_set_path, 411 'data train set', 412 ON_BEGIN_LOAD_TRAIN_SET, 413 ON_END_LOAD_TRAIN_SET, 414 names=['user', 'item', 'rating'] 415 ) 416 417 def load_test_set_dataframe(self, test_name: str='data test set') -> pd.DataFrame: 418 """Load the test set as a dataframe. 419 420 Args: 421 test_name: name of the test set dataframe to dispatch in the dataframe event. 422 423 Raises: 424 FileNotFoundError: when the test set file is not found. 425 426 Returns: 427 the loaded test set dataframe. 428 """ 429 return self.read_dataframe( 430 self.data_transition.test_set_path, 431 test_name, 432 ON_BEGIN_LOAD_TEST_SET, 433 ON_END_LOAD_TEST_SET, 434 names=['user', 'item', 'rating'] 435 ) 436 437 @abstractmethod 438 def load_test_set_users(self) -> None: 439 """Load the test set users that all models can use for testing. 440 441 Raises: 442 FileNotFoundError: when the test set file is not found. 443 """ 444 raise NotImplementedError() 445 446 def reconstruct_ratings( 447 self, 448 result_dirs: List[str], 449 is_running: Callable[[], bool]) -> None: 450 """Reconstruct the original ratings for all the computed models ratings. 451 452 Args: 453 result_dirs: a list of directories that contain a computed rating file. 454 is_running: function that returns whether the pipeline 455 is still running. Stops early when False is returned. 456 """ 457 if not is_running() or len(result_dirs) == 0: 458 return 459 460 # TODO should probably move this code to a separate pipeline 461 ratings_dataframe = pd.concat([ 462 self.load_train_set_dataframe(), 463 self.load_test_set_dataframe() 464 ]) 465 466 for model_dir in result_dirs: 467 if not is_running(): 468 return 469 470 result_file_path = os.path.join(model_dir, MODEL_RATINGS_FILE) 471 472 self.event_dispatcher.dispatch(FileEventArgs( 473 ON_BEGIN_RECONSTRUCT_RATINGS, 474 result_file_path 475 )) 476 477 start = time.time() 478 479 result = pd.read_csv(result_file_path, sep='\t') 480 result = pd.merge(result, ratings_dataframe, how='left', on=['user', 'item']) 481 result.to_csv(result_file_path, sep='\t', header=True, index=False) 482 483 end = time.time() 484 485 self.event_dispatcher.dispatch(FileEventArgs( 486 ON_END_RECONSTRUCT_RATINGS, 487 result_file_path 488 ), elapsed_time=end - start) 489 490 def test_model( 491 self, 492 model: BaseAlgorithm, 493 model_dir: str, 494 is_running: Callable[[], bool], 495 **kwargs) -> None: 496 """Test the specified model using the test set. 497 498 This function wraps the event dispatching and functionality 499 that both predictor and recommender models have in common. 500 501 Args: 502 model: the model that needs to be tested. 503 model_dir: the path of the directory where the computed ratings can be stored. 504 is_running: function that returns whether the pipeline 505 is still running. Stops early when False is returned. 506 507 Keyword Args: 508 num_items(int): the number of item recommendations to produce, only 509 needed when running the pipeline for recommender algorithms. 510 511 Raises: 512 ArithmeticError: possibly raised by a model on testing. 513 MemoryError: possibly raised by a model on testing. 514 RuntimeError: possibly raised by a model on testing. 515 """ 516 self.event_dispatcher.dispatch(ModelEventArgs( 517 ON_BEGIN_TEST_MODEL, 518 model.get_name(), 519 model.get_params() 520 )) 521 522 start = time.time() 523 524 result_file_path = os.path.join(model_dir, MODEL_RATINGS_FILE) 525 start_index = 0 526 while start_index < len(self.test_set_users): 527 if not is_running(): 528 return 529 530 user_batch = self.test_set_users[start_index : start_index + MODEL_USER_BATCH_SIZE] 531 ratings = self.test_model_ratings(model, user_batch, **kwargs) 532 if not is_running(): 533 return 534 535 self.write_dataframe(result_file_path, ratings, start_index == 0) 536 start_index += MODEL_USER_BATCH_SIZE 537 538 end = time.time() 539 540 self.event_dispatcher.dispatch(ModelEventArgs( 541 ON_END_TEST_MODEL, 542 model.get_name(), 543 model.get_params() 544 ), elapsed_time=end - start) 545 546 @abstractmethod 547 def test_model_ratings( 548 self, 549 model: BaseAlgorithm, 550 user_batch: List[int], 551 **kwargs) -> pd.DataFrame: 552 """Test the specified model for rating predictions or recommendations. 553 554 Args: 555 model: the model that needs to be tested. 556 user_batch: the user batch to compute model ratings for. 557 558 Keyword Args: 559 num_items(int): the number of item recommendations to produce, only 560 needed when running the pipeline for recommender algorithms. 561 562 Raises: 563 ArithmeticError: possibly raised by a model on testing. 564 MemoryError: possibly raised by a model on testing. 565 RuntimeError: possibly raised by a model on testing. 566 567 Returns: 568 a dataframe containing the computed model ratings. 569 """ 570 raise NotImplementedError() 571 572 def train_model(self, model: BaseAlgorithm) -> None: 573 """Train the specified model using the train set. 574 575 Args: 576 model: the model that needs to be trained. 577 578 Raises: 579 ArithmeticError: possibly raised by a model on training. 580 MemoryError: possibly raised by a model on training. 581 RuntimeError: possibly raised by a model on training. 582 """ 583 self.event_dispatcher.dispatch(ModelEventArgs( 584 ON_BEGIN_TRAIN_MODEL, 585 model.get_name(), 586 model.get_params() 587 )) 588 589 start = time.time() 590 591 model.train(self.train_set_matrix) 592 593 end = time.time() 594 595 self.event_dispatcher.dispatch(ModelEventArgs( 596 ON_END_TRAIN_MODEL, 597 model.get_name(), 598 model.get_params() 599 ), elapsed_time=end - start) 600 601 def train_and_test_model( 602 self, 603 model: BaseAlgorithm, 604 model_dir: str, 605 is_running: Callable[[], bool], 606 **kwargs) -> None: 607 """Train and test the specified model. 608 609 Several possible errors can be raised during the executing of both training and 610 testing the model: namely ArithmeticError, MemoryError and RuntimeError. 611 612 Args: 613 model: the model that needs to be trained. 614 model_dir: the path of the directory where the computed ratings can be stored. 615 is_running: function that returns whether the pipeline 616 is still running. Stops early when False is returned. 617 618 Raises: 619 ArithmeticError: possibly raised by a model on training or testing. 620 MemoryError: possibly raised by a model on training or testing. 621 RuntimeError: possibly raised by a model on training or testing. 622 623 Keyword Args: 624 num_items(int): the number of item recommendations to produce, only 625 needed when running the pipeline for recommender algorithms. 626 """ 627 try: 628 self.train_model(model) 629 except (ArithmeticError, MemoryError, RuntimeError) as err: 630 self.event_dispatcher.dispatch(ErrorEventArgs( 631 ON_RAISE_ERROR, 632 'Error: raised while training model ' + 633 self.algo_factory.get_name() + ' ' + model.get_name() 634 )) 635 # raise again so the model run aborts 636 raise err 637 638 try: 639 self.test_model(model, model_dir, is_running, **kwargs) 640 except (ArithmeticError, MemoryError, RuntimeError) as err: 641 self.event_dispatcher.dispatch(ErrorEventArgs( 642 ON_RAISE_ERROR, 643 'Error: raised while testing model ' + 644 self.algo_factory.get_name() + ' ' + model.get_name() 645 )) 646 # raise again so the model run aborts 647 raise err
42class ModelPipeline(CorePipeline, metaclass=ABCMeta): 43 """Model Pipeline to run computations for algorithms from a specific API. 44 45 Wraps the common functionality that applies to all models disregarding the type. 46 Loading the train and test is only done once each time the pipeline is run. 47 After the previously mentioned sets are done loading, the pipeline loops 48 through all specified models and executes the following steps: 49 50 1) create the output directory. 51 2) create the model. 52 3) save the model's creation settings. 53 4) train the model using the train set. 54 5) test the model using the test set. 55 56 After all models are trained and tested the computed rating files are updated 57 with the original ratings from the train and test set. 58 59 Abstract methods: 60 61 load_test_set_users 62 test_model_ratings 63 64 Public methods: 65 66 run 67 """ 68 69 def __init__( 70 self, 71 algo_factory: Factory, 72 data_transition: DataTransition, 73 event_dispatcher: EventDispatcher): 74 """Construct the model pipeline. 75 76 Args: 77 algo_factory: factory of available algorithms for this API. 78 data_transition: data input. 79 event_dispatcher: used to dispatch model/IO events when running the pipeline. 80 """ 81 CorePipeline.__init__(self, event_dispatcher) 82 self.algo_factory = algo_factory 83 self.data_transition = data_transition 84 self.tested_models = {} 85 86 self.train_set_matrix = None 87 self.test_set_users = None 88 89 def run(self, 90 output_dir: str, 91 models_config: List[ModelConfig], 92 is_running: Callable[[], bool], 93 **kwargs) -> List[str]: 94 """Run the entire pipeline from beginning to end. 95 96 Effectively running all computations of the specified models. 97 98 Args: 99 output_dir: the path of the directory to store the output. 100 models_config: list of ModelConfig objects to compute. 101 is_running: function that returns whether the pipeline 102 is still running. Stops early when False is returned. 103 104 Keyword Args: 105 num_threads(int): the max number of threads an algorithm can use. 106 num_items(int): the number of item recommendations to produce, only 107 needed when running the pipeline for recommender algorithms. 108 rated_items_filter(bool): whether to filter already rated items when 109 producing item recommendations. 110 111 Raises: 112 FileNotFoundError: when either the train and/or test fails to load. 113 114 Returns: 115 a list of model directories where computation results are stored. 116 """ 117 result_dirs = [] 118 if not is_running(): 119 return result_dirs 120 121 self.event_dispatcher.dispatch(ModelPipelineEventArgs( 122 ON_BEGIN_MODEL_PIPELINE, 123 self.algo_factory.get_name(), 124 models_config 125 )) 126 127 start = time.time() 128 129 # this can raise a FileNotFoundError, effectively aborting the pipeline 130 self.load_train_set_matrix() 131 if not is_running(): 132 return result_dirs 133 134 # this can raise a FileNotFoundError, effectively aborting the pipeline 135 self.load_test_set_users() 136 if not is_running(): 137 return result_dirs 138 139 for model in models_config: 140 # verify that the specified model is available 141 if not self.algo_factory.is_obj_available(model.name): 142 self.event_dispatcher.dispatch(ErrorEventArgs( 143 ON_FAILURE_ERROR, 144 'Failure: algorithm is not available: ' + 145 self.algo_factory.get_name() + ' ' + model.name 146 )) 147 continue 148 149 # create model output dir 150 model_dir = self.create_model_output_dir( 151 output_dir, 152 model.name 153 ) 154 155 # attempt to run the model computation 156 try: 157 self.run_model( 158 model_dir, 159 model, 160 is_running, 161 **kwargs 162 ) 163 except ArithmeticError: 164 self.event_dispatcher.dispatch(ErrorEventArgs( 165 ON_RAISE_ERROR, 166 'ArithmeticError: trying to run model ' + 167 self.algo_factory.get_name() + ' ' + model.name 168 )) 169 delete_dir(model_dir, self.event_dispatcher) 170 continue 171 except MemoryError: 172 self.event_dispatcher.dispatch(ErrorEventArgs( 173 ON_RAISE_ERROR, 174 'MemoryError: trying to run model ' + 175 self.algo_factory.get_name() + ' ' + model.name 176 )) 177 delete_dir(model_dir, self.event_dispatcher) 178 continue 179 except RuntimeError: 180 self.event_dispatcher.dispatch(ErrorEventArgs( 181 ON_RAISE_ERROR, 182 'RuntimeError: trying to run model ' + 183 self.algo_factory.get_name() + ' ' + model.name 184 )) 185 delete_dir(model_dir, self.event_dispatcher) 186 continue 187 188 result_dirs.append(model_dir) 189 if not is_running(): 190 return result_dirs 191 192 # free up some memory because everything is trained and tested 193 self.train_set_matrix = None 194 self.test_set_users = None 195 196 self.reconstruct_ratings(result_dirs, is_running) 197 198 end = time.time() 199 200 self.event_dispatcher.dispatch(ModelPipelineEventArgs( 201 ON_END_MODEL_PIPELINE, 202 self.algo_factory.get_name(), 203 models_config 204 ), elapsed_time=end - start) 205 206 return result_dirs 207 208 def run_model( 209 self, 210 model_dir: str, 211 model_config: ModelConfig, 212 is_running: Callable[[], bool], 213 **kwargs) -> None: 214 """Run the model computation for the specified model configuration. 215 216 Args: 217 model_dir: the path of the directory where the computed ratings can be stored. 218 model_config: the algorithm model configuration. 219 is_running: function that returns whether the pipeline 220 is still running. Stops early when False is returned. 221 222 Keyword Args: 223 num_threads(int): the max number of threads an algorithm can use. 224 num_items(int): the number of item recommendations to produce, only 225 needed when running the pipeline for recommender algorithms. 226 rated_items_filter(bool): whether to filter already rated items when 227 producing item recommendations. 228 229 Raises: 230 ArithmeticError: possibly raised by a model on construction, training or testing. 231 MemoryError: possibly raised by a model on construction, training or testing. 232 RuntimeError: possibly raised by a model on construction, training or testing. 233 """ 234 model, start = self.begin_model( 235 model_config.name, 236 model_config.params, 237 model_dir, 238 **kwargs 239 ) 240 if not is_running(): 241 return 242 243 self.train_and_test_model(model, model_dir, is_running, **kwargs) 244 if not is_running(): 245 return 246 247 self.end_model(model, start) 248 249 def begin_model( 250 self, 251 model_name: str, 252 model_params: Dict[str, Any], 253 model_dir: str, 254 **kwargs) -> Tuple[BaseAlgorithm, float]: 255 """Prepare the model computation. 256 257 Resolves the output directory to create for the model computation, 258 so that it is unique and creates the model. 259 260 Args: 261 model_name: name of the model's algorithm. 262 model_params: parameters of the algorithm. 263 model_dir: the path of the directory where the computed ratings can be stored. 264 265 Keyword Args: 266 num_threads(int): the max number of threads an algorithm can use. 267 rated_items_filter(bool): whether to filter already rated items when 268 producing item recommendations. 269 270 Raises: 271 ArithmeticError: possibly raised by a model on construction. 272 MemoryError: possibly raised by a model on construction. 273 RuntimeError: possibly raised by a model on construction. 274 275 Returns: 276 model: the created model according the specified name and parameters. 277 start: the time when the model computation started. 278 """ 279 start = time.time() 280 281 self.event_dispatcher.dispatch(ModelEventArgs( 282 ON_BEGIN_MODEL, 283 model_name, 284 model_params 285 )) 286 287 # attempt to create model 288 kwargs['rating_type'] = self.data_transition.get_rating_type() 289 model = self.algo_factory.create( 290 model_name, 291 model_params, 292 **kwargs 293 ) 294 295 # create settings file 296 create_json( 297 os.path.join(model_dir, 'settings.json'), 298 model.get_params(), 299 self.event_dispatcher, 300 indent=4 301 ) 302 303 return model, start 304 305 def create_model_output_dir(self, output_dir: str, model_name: str) -> str: 306 """Create the output directory for a model. 307 308 Args: 309 output_dir: the path of the directory to store the output. 310 model_name: name of the model's algorithm. 311 312 Returns: 313 the path of the directory where the model's computed ratings can be stored. 314 """ 315 if self.tested_models.get(model_name) is None: 316 # initialize model name counter 317 self.tested_models[model_name] = 0 318 319 return create_dir(self.get_model_output_dir(output_dir, model_name), self.event_dispatcher) 320 321 def get_model_output_dir(self, output_dir: str, model_name: str) -> str: 322 """Get the model output directory path for the specified model name. 323 324 Args: 325 output_dir: the path of the directory to store the output. 326 model_name: name of the model's algorithm. 327 328 Returns: 329 the path of the directory where the model's computed ratings can be stored. 330 """ 331 index = self.tested_models[model_name] 332 return os.path.join( 333 output_dir, 334 self.algo_factory.get_name() + '_' + model_name + '_' + str(index) 335 ) 336 337 def end_model(self, model: BaseAlgorithm, start: float) -> None: 338 """Finalize the model computation. 339 340 Updates the number of tested models so that additional 341 computations remain unique for this model. 342 343 Args: 344 model: the model that finished. 345 start: the time when the model computation started. 346 """ 347 self.tested_models[model.get_name()] += 1 348 349 end = time.time() 350 351 self.event_dispatcher.dispatch(ModelEventArgs( 352 ON_END_MODEL, 353 model.get_name(), 354 model.get_params() 355 ), elapsed_time=end - start) 356 357 def on_load_train_set_matrix(self) -> Matrix: 358 """Load the train set matrix that all models can use for training. 359 360 The default train set matrix of the model pipeline is a dataframe. 361 Derived classes are allowed to override this function to return a different type of matrix. 362 363 Returns: 364 the loaded train set matrix dataframe. 365 """ 366 return Matrix(self.data_transition.train_set_path) 367 368 def load_train_set_matrix(self) -> None: 369 """Load the train set matrix that all models can use for training. 370 371 Raises: 372 FileNotFoundError: when the train set file is not found. 373 """ 374 self.event_dispatcher.dispatch(DataframeEventArgs( 375 ON_BEGIN_LOAD_TRAIN_SET, 376 self.data_transition.train_set_path, 377 'model train set matrix' 378 )) 379 380 start = time.time() 381 382 try: 383 self.train_set_matrix = self.on_load_train_set_matrix() 384 except FileNotFoundError as err: 385 self.event_dispatcher.dispatch(ErrorEventArgs( 386 ON_RAISE_ERROR, 387 'FileNotFoundError: raised while trying to load the matrix train set from ' + 388 self.data_transition.train_set_path 389 )) 390 raise err 391 392 393 end = time.time() 394 395 self.event_dispatcher.dispatch(DataframeEventArgs( 396 ON_END_LOAD_TRAIN_SET, 397 self.data_transition.train_set_path, 398 'model train set matrix' 399 ), elapsed_time=end - start) 400 401 def load_train_set_dataframe(self) -> pd.DataFrame: 402 """Load the train set as a dataframe. 403 404 Raises: 405 FileNotFoundError: when the train set file is not found. 406 407 Returns: 408 the loaded train set dataframe. 409 """ 410 return self.read_dataframe( 411 self.data_transition.train_set_path, 412 'data train set', 413 ON_BEGIN_LOAD_TRAIN_SET, 414 ON_END_LOAD_TRAIN_SET, 415 names=['user', 'item', 'rating'] 416 ) 417 418 def load_test_set_dataframe(self, test_name: str='data test set') -> pd.DataFrame: 419 """Load the test set as a dataframe. 420 421 Args: 422 test_name: name of the test set dataframe to dispatch in the dataframe event. 423 424 Raises: 425 FileNotFoundError: when the test set file is not found. 426 427 Returns: 428 the loaded test set dataframe. 429 """ 430 return self.read_dataframe( 431 self.data_transition.test_set_path, 432 test_name, 433 ON_BEGIN_LOAD_TEST_SET, 434 ON_END_LOAD_TEST_SET, 435 names=['user', 'item', 'rating'] 436 ) 437 438 @abstractmethod 439 def load_test_set_users(self) -> None: 440 """Load the test set users that all models can use for testing. 441 442 Raises: 443 FileNotFoundError: when the test set file is not found. 444 """ 445 raise NotImplementedError() 446 447 def reconstruct_ratings( 448 self, 449 result_dirs: List[str], 450 is_running: Callable[[], bool]) -> None: 451 """Reconstruct the original ratings for all the computed models ratings. 452 453 Args: 454 result_dirs: a list of directories that contain a computed rating file. 455 is_running: function that returns whether the pipeline 456 is still running. Stops early when False is returned. 457 """ 458 if not is_running() or len(result_dirs) == 0: 459 return 460 461 # TODO should probably move this code to a separate pipeline 462 ratings_dataframe = pd.concat([ 463 self.load_train_set_dataframe(), 464 self.load_test_set_dataframe() 465 ]) 466 467 for model_dir in result_dirs: 468 if not is_running(): 469 return 470 471 result_file_path = os.path.join(model_dir, MODEL_RATINGS_FILE) 472 473 self.event_dispatcher.dispatch(FileEventArgs( 474 ON_BEGIN_RECONSTRUCT_RATINGS, 475 result_file_path 476 )) 477 478 start = time.time() 479 480 result = pd.read_csv(result_file_path, sep='\t') 481 result = pd.merge(result, ratings_dataframe, how='left', on=['user', 'item']) 482 result.to_csv(result_file_path, sep='\t', header=True, index=False) 483 484 end = time.time() 485 486 self.event_dispatcher.dispatch(FileEventArgs( 487 ON_END_RECONSTRUCT_RATINGS, 488 result_file_path 489 ), elapsed_time=end - start) 490 491 def test_model( 492 self, 493 model: BaseAlgorithm, 494 model_dir: str, 495 is_running: Callable[[], bool], 496 **kwargs) -> None: 497 """Test the specified model using the test set. 498 499 This function wraps the event dispatching and functionality 500 that both predictor and recommender models have in common. 501 502 Args: 503 model: the model that needs to be tested. 504 model_dir: the path of the directory where the computed ratings can be stored. 505 is_running: function that returns whether the pipeline 506 is still running. Stops early when False is returned. 507 508 Keyword Args: 509 num_items(int): the number of item recommendations to produce, only 510 needed when running the pipeline for recommender algorithms. 511 512 Raises: 513 ArithmeticError: possibly raised by a model on testing. 514 MemoryError: possibly raised by a model on testing. 515 RuntimeError: possibly raised by a model on testing. 516 """ 517 self.event_dispatcher.dispatch(ModelEventArgs( 518 ON_BEGIN_TEST_MODEL, 519 model.get_name(), 520 model.get_params() 521 )) 522 523 start = time.time() 524 525 result_file_path = os.path.join(model_dir, MODEL_RATINGS_FILE) 526 start_index = 0 527 while start_index < len(self.test_set_users): 528 if not is_running(): 529 return 530 531 user_batch = self.test_set_users[start_index : start_index + MODEL_USER_BATCH_SIZE] 532 ratings = self.test_model_ratings(model, user_batch, **kwargs) 533 if not is_running(): 534 return 535 536 self.write_dataframe(result_file_path, ratings, start_index == 0) 537 start_index += MODEL_USER_BATCH_SIZE 538 539 end = time.time() 540 541 self.event_dispatcher.dispatch(ModelEventArgs( 542 ON_END_TEST_MODEL, 543 model.get_name(), 544 model.get_params() 545 ), elapsed_time=end - start) 546 547 @abstractmethod 548 def test_model_ratings( 549 self, 550 model: BaseAlgorithm, 551 user_batch: List[int], 552 **kwargs) -> pd.DataFrame: 553 """Test the specified model for rating predictions or recommendations. 554 555 Args: 556 model: the model that needs to be tested. 557 user_batch: the user batch to compute model ratings for. 558 559 Keyword Args: 560 num_items(int): the number of item recommendations to produce, only 561 needed when running the pipeline for recommender algorithms. 562 563 Raises: 564 ArithmeticError: possibly raised by a model on testing. 565 MemoryError: possibly raised by a model on testing. 566 RuntimeError: possibly raised by a model on testing. 567 568 Returns: 569 a dataframe containing the computed model ratings. 570 """ 571 raise NotImplementedError() 572 573 def train_model(self, model: BaseAlgorithm) -> None: 574 """Train the specified model using the train set. 575 576 Args: 577 model: the model that needs to be trained. 578 579 Raises: 580 ArithmeticError: possibly raised by a model on training. 581 MemoryError: possibly raised by a model on training. 582 RuntimeError: possibly raised by a model on training. 583 """ 584 self.event_dispatcher.dispatch(ModelEventArgs( 585 ON_BEGIN_TRAIN_MODEL, 586 model.get_name(), 587 model.get_params() 588 )) 589 590 start = time.time() 591 592 model.train(self.train_set_matrix) 593 594 end = time.time() 595 596 self.event_dispatcher.dispatch(ModelEventArgs( 597 ON_END_TRAIN_MODEL, 598 model.get_name(), 599 model.get_params() 600 ), elapsed_time=end - start) 601 602 def train_and_test_model( 603 self, 604 model: BaseAlgorithm, 605 model_dir: str, 606 is_running: Callable[[], bool], 607 **kwargs) -> None: 608 """Train and test the specified model. 609 610 Several possible errors can be raised during the executing of both training and 611 testing the model: namely ArithmeticError, MemoryError and RuntimeError. 612 613 Args: 614 model: the model that needs to be trained. 615 model_dir: the path of the directory where the computed ratings can be stored. 616 is_running: function that returns whether the pipeline 617 is still running. Stops early when False is returned. 618 619 Raises: 620 ArithmeticError: possibly raised by a model on training or testing. 621 MemoryError: possibly raised by a model on training or testing. 622 RuntimeError: possibly raised by a model on training or testing. 623 624 Keyword Args: 625 num_items(int): the number of item recommendations to produce, only 626 needed when running the pipeline for recommender algorithms. 627 """ 628 try: 629 self.train_model(model) 630 except (ArithmeticError, MemoryError, RuntimeError) as err: 631 self.event_dispatcher.dispatch(ErrorEventArgs( 632 ON_RAISE_ERROR, 633 'Error: raised while training model ' + 634 self.algo_factory.get_name() + ' ' + model.get_name() 635 )) 636 # raise again so the model run aborts 637 raise err 638 639 try: 640 self.test_model(model, model_dir, is_running, **kwargs) 641 except (ArithmeticError, MemoryError, RuntimeError) as err: 642 self.event_dispatcher.dispatch(ErrorEventArgs( 643 ON_RAISE_ERROR, 644 'Error: raised while testing model ' + 645 self.algo_factory.get_name() + ' ' + model.get_name() 646 )) 647 # raise again so the model run aborts 648 raise err
Model Pipeline to run computations for algorithms from a specific API.
Wraps the common functionality that applies to all models disregarding the type. Loading the train and test is only done once each time the pipeline is run. After the previously mentioned sets are done loading, the pipeline loops through all specified models and executes the following steps:
1) create the output directory. 2) create the model. 3) save the model's creation settings. 4) train the model using the train set. 5) test the model using the test set.
After all models are trained and tested the computed rating files are updated with the original ratings from the train and test set.
Abstract methods:
load_test_set_users test_model_ratings
Public methods:
run
69 def __init__( 70 self, 71 algo_factory: Factory, 72 data_transition: DataTransition, 73 event_dispatcher: EventDispatcher): 74 """Construct the model pipeline. 75 76 Args: 77 algo_factory: factory of available algorithms for this API. 78 data_transition: data input. 79 event_dispatcher: used to dispatch model/IO events when running the pipeline. 80 """ 81 CorePipeline.__init__(self, event_dispatcher) 82 self.algo_factory = algo_factory 83 self.data_transition = data_transition 84 self.tested_models = {} 85 86 self.train_set_matrix = None 87 self.test_set_users = None
Construct the model pipeline.
Args: algo_factory: factory of available algorithms for this API. data_transition: data input. event_dispatcher: used to dispatch model/IO events when running the pipeline.
89 def run(self, 90 output_dir: str, 91 models_config: List[ModelConfig], 92 is_running: Callable[[], bool], 93 **kwargs) -> List[str]: 94 """Run the entire pipeline from beginning to end. 95 96 Effectively running all computations of the specified models. 97 98 Args: 99 output_dir: the path of the directory to store the output. 100 models_config: list of ModelConfig objects to compute. 101 is_running: function that returns whether the pipeline 102 is still running. Stops early when False is returned. 103 104 Keyword Args: 105 num_threads(int): the max number of threads an algorithm can use. 106 num_items(int): the number of item recommendations to produce, only 107 needed when running the pipeline for recommender algorithms. 108 rated_items_filter(bool): whether to filter already rated items when 109 producing item recommendations. 110 111 Raises: 112 FileNotFoundError: when either the train and/or test fails to load. 113 114 Returns: 115 a list of model directories where computation results are stored. 116 """ 117 result_dirs = [] 118 if not is_running(): 119 return result_dirs 120 121 self.event_dispatcher.dispatch(ModelPipelineEventArgs( 122 ON_BEGIN_MODEL_PIPELINE, 123 self.algo_factory.get_name(), 124 models_config 125 )) 126 127 start = time.time() 128 129 # this can raise a FileNotFoundError, effectively aborting the pipeline 130 self.load_train_set_matrix() 131 if not is_running(): 132 return result_dirs 133 134 # this can raise a FileNotFoundError, effectively aborting the pipeline 135 self.load_test_set_users() 136 if not is_running(): 137 return result_dirs 138 139 for model in models_config: 140 # verify that the specified model is available 141 if not self.algo_factory.is_obj_available(model.name): 142 self.event_dispatcher.dispatch(ErrorEventArgs( 143 ON_FAILURE_ERROR, 144 'Failure: algorithm is not available: ' + 145 self.algo_factory.get_name() + ' ' + model.name 146 )) 147 continue 148 149 # create model output dir 150 model_dir = self.create_model_output_dir( 151 output_dir, 152 model.name 153 ) 154 155 # attempt to run the model computation 156 try: 157 self.run_model( 158 model_dir, 159 model, 160 is_running, 161 **kwargs 162 ) 163 except ArithmeticError: 164 self.event_dispatcher.dispatch(ErrorEventArgs( 165 ON_RAISE_ERROR, 166 'ArithmeticError: trying to run model ' + 167 self.algo_factory.get_name() + ' ' + model.name 168 )) 169 delete_dir(model_dir, self.event_dispatcher) 170 continue 171 except MemoryError: 172 self.event_dispatcher.dispatch(ErrorEventArgs( 173 ON_RAISE_ERROR, 174 'MemoryError: trying to run model ' + 175 self.algo_factory.get_name() + ' ' + model.name 176 )) 177 delete_dir(model_dir, self.event_dispatcher) 178 continue 179 except RuntimeError: 180 self.event_dispatcher.dispatch(ErrorEventArgs( 181 ON_RAISE_ERROR, 182 'RuntimeError: trying to run model ' + 183 self.algo_factory.get_name() + ' ' + model.name 184 )) 185 delete_dir(model_dir, self.event_dispatcher) 186 continue 187 188 result_dirs.append(model_dir) 189 if not is_running(): 190 return result_dirs 191 192 # free up some memory because everything is trained and tested 193 self.train_set_matrix = None 194 self.test_set_users = None 195 196 self.reconstruct_ratings(result_dirs, is_running) 197 198 end = time.time() 199 200 self.event_dispatcher.dispatch(ModelPipelineEventArgs( 201 ON_END_MODEL_PIPELINE, 202 self.algo_factory.get_name(), 203 models_config 204 ), elapsed_time=end - start) 205 206 return result_dirs
Run the entire pipeline from beginning to end.
Effectively running all computations of the specified models.
Args: output_dir: the path of the directory to store the output. models_config: list of ModelConfig objects to compute. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.
Keyword Args: num_threads(int): the max number of threads an algorithm can use. num_items(int): the number of item recommendations to produce, only needed when running the pipeline for recommender algorithms. rated_items_filter(bool): whether to filter already rated items when producing item recommendations.
Raises: FileNotFoundError: when either the train and/or test fails to load.
Returns: a list of model directories where computation results are stored.
208 def run_model( 209 self, 210 model_dir: str, 211 model_config: ModelConfig, 212 is_running: Callable[[], bool], 213 **kwargs) -> None: 214 """Run the model computation for the specified model configuration. 215 216 Args: 217 model_dir: the path of the directory where the computed ratings can be stored. 218 model_config: the algorithm model configuration. 219 is_running: function that returns whether the pipeline 220 is still running. Stops early when False is returned. 221 222 Keyword Args: 223 num_threads(int): the max number of threads an algorithm can use. 224 num_items(int): the number of item recommendations to produce, only 225 needed when running the pipeline for recommender algorithms. 226 rated_items_filter(bool): whether to filter already rated items when 227 producing item recommendations. 228 229 Raises: 230 ArithmeticError: possibly raised by a model on construction, training or testing. 231 MemoryError: possibly raised by a model on construction, training or testing. 232 RuntimeError: possibly raised by a model on construction, training or testing. 233 """ 234 model, start = self.begin_model( 235 model_config.name, 236 model_config.params, 237 model_dir, 238 **kwargs 239 ) 240 if not is_running(): 241 return 242 243 self.train_and_test_model(model, model_dir, is_running, **kwargs) 244 if not is_running(): 245 return 246 247 self.end_model(model, start)
Run the model computation for the specified model configuration.
Args: model_dir: the path of the directory where the computed ratings can be stored. model_config: the algorithm model configuration. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.
Keyword Args: num_threads(int): the max number of threads an algorithm can use. num_items(int): the number of item recommendations to produce, only needed when running the pipeline for recommender algorithms. rated_items_filter(bool): whether to filter already rated items when producing item recommendations.
Raises: ArithmeticError: possibly raised by a model on construction, training or testing. MemoryError: possibly raised by a model on construction, training or testing. RuntimeError: possibly raised by a model on construction, training or testing.
249 def begin_model( 250 self, 251 model_name: str, 252 model_params: Dict[str, Any], 253 model_dir: str, 254 **kwargs) -> Tuple[BaseAlgorithm, float]: 255 """Prepare the model computation. 256 257 Resolves the output directory to create for the model computation, 258 so that it is unique and creates the model. 259 260 Args: 261 model_name: name of the model's algorithm. 262 model_params: parameters of the algorithm. 263 model_dir: the path of the directory where the computed ratings can be stored. 264 265 Keyword Args: 266 num_threads(int): the max number of threads an algorithm can use. 267 rated_items_filter(bool): whether to filter already rated items when 268 producing item recommendations. 269 270 Raises: 271 ArithmeticError: possibly raised by a model on construction. 272 MemoryError: possibly raised by a model on construction. 273 RuntimeError: possibly raised by a model on construction. 274 275 Returns: 276 model: the created model according the specified name and parameters. 277 start: the time when the model computation started. 278 """ 279 start = time.time() 280 281 self.event_dispatcher.dispatch(ModelEventArgs( 282 ON_BEGIN_MODEL, 283 model_name, 284 model_params 285 )) 286 287 # attempt to create model 288 kwargs['rating_type'] = self.data_transition.get_rating_type() 289 model = self.algo_factory.create( 290 model_name, 291 model_params, 292 **kwargs 293 ) 294 295 # create settings file 296 create_json( 297 os.path.join(model_dir, 'settings.json'), 298 model.get_params(), 299 self.event_dispatcher, 300 indent=4 301 ) 302 303 return model, start
Prepare the model computation.
Resolves the output directory to create for the model computation, so that it is unique and creates the model.
Args: model_name: name of the model's algorithm. model_params: parameters of the algorithm. model_dir: the path of the directory where the computed ratings can be stored.
Keyword Args: num_threads(int): the max number of threads an algorithm can use. rated_items_filter(bool): whether to filter already rated items when producing item recommendations.
Raises: ArithmeticError: possibly raised by a model on construction. MemoryError: possibly raised by a model on construction. RuntimeError: possibly raised by a model on construction.
Returns: model: the created model according the specified name and parameters. start: the time when the model computation started.
305 def create_model_output_dir(self, output_dir: str, model_name: str) -> str: 306 """Create the output directory for a model. 307 308 Args: 309 output_dir: the path of the directory to store the output. 310 model_name: name of the model's algorithm. 311 312 Returns: 313 the path of the directory where the model's computed ratings can be stored. 314 """ 315 if self.tested_models.get(model_name) is None: 316 # initialize model name counter 317 self.tested_models[model_name] = 0 318 319 return create_dir(self.get_model_output_dir(output_dir, model_name), self.event_dispatcher)
Create the output directory for a model.
Args: output_dir: the path of the directory to store the output. model_name: name of the model's algorithm.
Returns: the path of the directory where the model's computed ratings can be stored.
321 def get_model_output_dir(self, output_dir: str, model_name: str) -> str: 322 """Get the model output directory path for the specified model name. 323 324 Args: 325 output_dir: the path of the directory to store the output. 326 model_name: name of the model's algorithm. 327 328 Returns: 329 the path of the directory where the model's computed ratings can be stored. 330 """ 331 index = self.tested_models[model_name] 332 return os.path.join( 333 output_dir, 334 self.algo_factory.get_name() + '_' + model_name + '_' + str(index) 335 )
Get the model output directory path for the specified model name.
Args: output_dir: the path of the directory to store the output. model_name: name of the model's algorithm.
Returns: the path of the directory where the model's computed ratings can be stored.
337 def end_model(self, model: BaseAlgorithm, start: float) -> None: 338 """Finalize the model computation. 339 340 Updates the number of tested models so that additional 341 computations remain unique for this model. 342 343 Args: 344 model: the model that finished. 345 start: the time when the model computation started. 346 """ 347 self.tested_models[model.get_name()] += 1 348 349 end = time.time() 350 351 self.event_dispatcher.dispatch(ModelEventArgs( 352 ON_END_MODEL, 353 model.get_name(), 354 model.get_params() 355 ), elapsed_time=end - start)
Finalize the model computation.
Updates the number of tested models so that additional computations remain unique for this model.
Args: model: the model that finished. start: the time when the model computation started.
357 def on_load_train_set_matrix(self) -> Matrix: 358 """Load the train set matrix that all models can use for training. 359 360 The default train set matrix of the model pipeline is a dataframe. 361 Derived classes are allowed to override this function to return a different type of matrix. 362 363 Returns: 364 the loaded train set matrix dataframe. 365 """ 366 return Matrix(self.data_transition.train_set_path)
Load the train set matrix that all models can use for training.
The default train set matrix of the model pipeline is a dataframe. Derived classes are allowed to override this function to return a different type of matrix.
Returns: the loaded train set matrix dataframe.
368 def load_train_set_matrix(self) -> None: 369 """Load the train set matrix that all models can use for training. 370 371 Raises: 372 FileNotFoundError: when the train set file is not found. 373 """ 374 self.event_dispatcher.dispatch(DataframeEventArgs( 375 ON_BEGIN_LOAD_TRAIN_SET, 376 self.data_transition.train_set_path, 377 'model train set matrix' 378 )) 379 380 start = time.time() 381 382 try: 383 self.train_set_matrix = self.on_load_train_set_matrix() 384 except FileNotFoundError as err: 385 self.event_dispatcher.dispatch(ErrorEventArgs( 386 ON_RAISE_ERROR, 387 'FileNotFoundError: raised while trying to load the matrix train set from ' + 388 self.data_transition.train_set_path 389 )) 390 raise err 391 392 393 end = time.time() 394 395 self.event_dispatcher.dispatch(DataframeEventArgs( 396 ON_END_LOAD_TRAIN_SET, 397 self.data_transition.train_set_path, 398 'model train set matrix' 399 ), elapsed_time=end - start)
Load the train set matrix that all models can use for training.
Raises: FileNotFoundError: when the train set file is not found.
401 def load_train_set_dataframe(self) -> pd.DataFrame: 402 """Load the train set as a dataframe. 403 404 Raises: 405 FileNotFoundError: when the train set file is not found. 406 407 Returns: 408 the loaded train set dataframe. 409 """ 410 return self.read_dataframe( 411 self.data_transition.train_set_path, 412 'data train set', 413 ON_BEGIN_LOAD_TRAIN_SET, 414 ON_END_LOAD_TRAIN_SET, 415 names=['user', 'item', 'rating'] 416 )
Load the train set as a dataframe.
Raises: FileNotFoundError: when the train set file is not found.
Returns: the loaded train set dataframe.
418 def load_test_set_dataframe(self, test_name: str='data test set') -> pd.DataFrame: 419 """Load the test set as a dataframe. 420 421 Args: 422 test_name: name of the test set dataframe to dispatch in the dataframe event. 423 424 Raises: 425 FileNotFoundError: when the test set file is not found. 426 427 Returns: 428 the loaded test set dataframe. 429 """ 430 return self.read_dataframe( 431 self.data_transition.test_set_path, 432 test_name, 433 ON_BEGIN_LOAD_TEST_SET, 434 ON_END_LOAD_TEST_SET, 435 names=['user', 'item', 'rating'] 436 )
Load the test set as a dataframe.
Args: test_name: name of the test set dataframe to dispatch in the dataframe event.
Raises: FileNotFoundError: when the test set file is not found.
Returns: the loaded test set dataframe.
438 @abstractmethod 439 def load_test_set_users(self) -> None: 440 """Load the test set users that all models can use for testing. 441 442 Raises: 443 FileNotFoundError: when the test set file is not found. 444 """ 445 raise NotImplementedError()
Load the test set users that all models can use for testing.
Raises: FileNotFoundError: when the test set file is not found.
447 def reconstruct_ratings( 448 self, 449 result_dirs: List[str], 450 is_running: Callable[[], bool]) -> None: 451 """Reconstruct the original ratings for all the computed models ratings. 452 453 Args: 454 result_dirs: a list of directories that contain a computed rating file. 455 is_running: function that returns whether the pipeline 456 is still running. Stops early when False is returned. 457 """ 458 if not is_running() or len(result_dirs) == 0: 459 return 460 461 # TODO should probably move this code to a separate pipeline 462 ratings_dataframe = pd.concat([ 463 self.load_train_set_dataframe(), 464 self.load_test_set_dataframe() 465 ]) 466 467 for model_dir in result_dirs: 468 if not is_running(): 469 return 470 471 result_file_path = os.path.join(model_dir, MODEL_RATINGS_FILE) 472 473 self.event_dispatcher.dispatch(FileEventArgs( 474 ON_BEGIN_RECONSTRUCT_RATINGS, 475 result_file_path 476 )) 477 478 start = time.time() 479 480 result = pd.read_csv(result_file_path, sep='\t') 481 result = pd.merge(result, ratings_dataframe, how='left', on=['user', 'item']) 482 result.to_csv(result_file_path, sep='\t', header=True, index=False) 483 484 end = time.time() 485 486 self.event_dispatcher.dispatch(FileEventArgs( 487 ON_END_RECONSTRUCT_RATINGS, 488 result_file_path 489 ), elapsed_time=end - start)
Reconstruct the original ratings for all the computed models ratings.
Args: result_dirs: a list of directories that contain a computed rating file. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.
491 def test_model( 492 self, 493 model: BaseAlgorithm, 494 model_dir: str, 495 is_running: Callable[[], bool], 496 **kwargs) -> None: 497 """Test the specified model using the test set. 498 499 This function wraps the event dispatching and functionality 500 that both predictor and recommender models have in common. 501 502 Args: 503 model: the model that needs to be tested. 504 model_dir: the path of the directory where the computed ratings can be stored. 505 is_running: function that returns whether the pipeline 506 is still running. Stops early when False is returned. 507 508 Keyword Args: 509 num_items(int): the number of item recommendations to produce, only 510 needed when running the pipeline for recommender algorithms. 511 512 Raises: 513 ArithmeticError: possibly raised by a model on testing. 514 MemoryError: possibly raised by a model on testing. 515 RuntimeError: possibly raised by a model on testing. 516 """ 517 self.event_dispatcher.dispatch(ModelEventArgs( 518 ON_BEGIN_TEST_MODEL, 519 model.get_name(), 520 model.get_params() 521 )) 522 523 start = time.time() 524 525 result_file_path = os.path.join(model_dir, MODEL_RATINGS_FILE) 526 start_index = 0 527 while start_index < len(self.test_set_users): 528 if not is_running(): 529 return 530 531 user_batch = self.test_set_users[start_index : start_index + MODEL_USER_BATCH_SIZE] 532 ratings = self.test_model_ratings(model, user_batch, **kwargs) 533 if not is_running(): 534 return 535 536 self.write_dataframe(result_file_path, ratings, start_index == 0) 537 start_index += MODEL_USER_BATCH_SIZE 538 539 end = time.time() 540 541 self.event_dispatcher.dispatch(ModelEventArgs( 542 ON_END_TEST_MODEL, 543 model.get_name(), 544 model.get_params() 545 ), elapsed_time=end - start)
Test the specified model using the test set.
This function wraps the event dispatching and functionality that both predictor and recommender models have in common.
Args: model: the model that needs to be tested. model_dir: the path of the directory where the computed ratings can be stored. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.
Keyword Args: num_items(int): the number of item recommendations to produce, only needed when running the pipeline for recommender algorithms.
Raises: ArithmeticError: possibly raised by a model on testing. MemoryError: possibly raised by a model on testing. RuntimeError: possibly raised by a model on testing.
547 @abstractmethod 548 def test_model_ratings( 549 self, 550 model: BaseAlgorithm, 551 user_batch: List[int], 552 **kwargs) -> pd.DataFrame: 553 """Test the specified model for rating predictions or recommendations. 554 555 Args: 556 model: the model that needs to be tested. 557 user_batch: the user batch to compute model ratings for. 558 559 Keyword Args: 560 num_items(int): the number of item recommendations to produce, only 561 needed when running the pipeline for recommender algorithms. 562 563 Raises: 564 ArithmeticError: possibly raised by a model on testing. 565 MemoryError: possibly raised by a model on testing. 566 RuntimeError: possibly raised by a model on testing. 567 568 Returns: 569 a dataframe containing the computed model ratings. 570 """ 571 raise NotImplementedError()
Test the specified model for rating predictions or recommendations.
Args: model: the model that needs to be tested. user_batch: the user batch to compute model ratings for.
Keyword Args: num_items(int): the number of item recommendations to produce, only needed when running the pipeline for recommender algorithms.
Raises: ArithmeticError: possibly raised by a model on testing. MemoryError: possibly raised by a model on testing. RuntimeError: possibly raised by a model on testing.
Returns: a dataframe containing the computed model ratings.
573 def train_model(self, model: BaseAlgorithm) -> None: 574 """Train the specified model using the train set. 575 576 Args: 577 model: the model that needs to be trained. 578 579 Raises: 580 ArithmeticError: possibly raised by a model on training. 581 MemoryError: possibly raised by a model on training. 582 RuntimeError: possibly raised by a model on training. 583 """ 584 self.event_dispatcher.dispatch(ModelEventArgs( 585 ON_BEGIN_TRAIN_MODEL, 586 model.get_name(), 587 model.get_params() 588 )) 589 590 start = time.time() 591 592 model.train(self.train_set_matrix) 593 594 end = time.time() 595 596 self.event_dispatcher.dispatch(ModelEventArgs( 597 ON_END_TRAIN_MODEL, 598 model.get_name(), 599 model.get_params() 600 ), elapsed_time=end - start)
Train the specified model using the train set.
Args: model: the model that needs to be trained.
Raises: ArithmeticError: possibly raised by a model on training. MemoryError: possibly raised by a model on training. RuntimeError: possibly raised by a model on training.
602 def train_and_test_model( 603 self, 604 model: BaseAlgorithm, 605 model_dir: str, 606 is_running: Callable[[], bool], 607 **kwargs) -> None: 608 """Train and test the specified model. 609 610 Several possible errors can be raised during the executing of both training and 611 testing the model: namely ArithmeticError, MemoryError and RuntimeError. 612 613 Args: 614 model: the model that needs to be trained. 615 model_dir: the path of the directory where the computed ratings can be stored. 616 is_running: function that returns whether the pipeline 617 is still running. Stops early when False is returned. 618 619 Raises: 620 ArithmeticError: possibly raised by a model on training or testing. 621 MemoryError: possibly raised by a model on training or testing. 622 RuntimeError: possibly raised by a model on training or testing. 623 624 Keyword Args: 625 num_items(int): the number of item recommendations to produce, only 626 needed when running the pipeline for recommender algorithms. 627 """ 628 try: 629 self.train_model(model) 630 except (ArithmeticError, MemoryError, RuntimeError) as err: 631 self.event_dispatcher.dispatch(ErrorEventArgs( 632 ON_RAISE_ERROR, 633 'Error: raised while training model ' + 634 self.algo_factory.get_name() + ' ' + model.get_name() 635 )) 636 # raise again so the model run aborts 637 raise err 638 639 try: 640 self.test_model(model, model_dir, is_running, **kwargs) 641 except (ArithmeticError, MemoryError, RuntimeError) as err: 642 self.event_dispatcher.dispatch(ErrorEventArgs( 643 ON_RAISE_ERROR, 644 'Error: raised while testing model ' + 645 self.algo_factory.get_name() + ' ' + model.get_name() 646 )) 647 # raise again so the model run aborts 648 raise err
Train and test the specified model.
Several possible errors can be raised during the executing of both training and testing the model: namely ArithmeticError, MemoryError and RuntimeError.
Args: model: the model that needs to be trained. model_dir: the path of the directory where the computed ratings can be stored. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.
Raises: ArithmeticError: possibly raised by a model on training or testing. MemoryError: possibly raised by a model on training or testing. RuntimeError: possibly raised by a model on training or testing.
Keyword Args: num_items(int): the number of item recommendations to produce, only needed when running the pipeline for recommender algorithms.