src.fairreckitlib.evaluation.pipeline.evaluation_pipeline
This module contains base functionality of the complete evaluation pipeline.
Classes:
EvaluationPipeline: class that runs multiple metric computations for a specific evaluation set.
Functions:
add_evaluation_to_file: append computed evaluation to a result file.
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)
1"""This module contains base functionality of the complete evaluation pipeline. 2 3Classes: 4 5 EvaluationPipeline: class that runs multiple metric computations for a specific evaluation set. 6 7Functions: 8 9 add_evaluation_to_file: append computed evaluation to a result file. 10 11This program has been developed by students from the bachelor Computer Science at 12Utrecht University within the Software Project course. 13© Copyright Utrecht University (Department of Information and Computing Sciences) 14""" 15 16import os 17import time 18from typing import Callable, List, Optional 19 20from ...core.config.config_factories import Factory, GroupFactory, resolve_factory 21from ...core.core_constants import KEY_NAME, KEY_PARAMS 22from ...core.events.event_dispatcher import EventDispatcher 23from ...core.events.event_error import ON_FAILURE_ERROR, ON_RAISE_ERROR, ErrorEventArgs 24from ...core.io.io_create import create_json 25from ...core.io.io_utility import load_json, save_json 26from ...core.pipeline.core_pipeline import CorePipeline 27from ...data.filter.filter_config import DataSubsetConfig 28from ...data.filter.filter_event import FilterDataframeEventArgs 29from ...data.filter.filter_passes import filter_from_filter_passes 30from ...data.set.dataset import Dataset 31from ..metrics.metric_base import BaseMetric 32from ..metrics.metric_constants import KEY_METRIC_EVALUATION, KEY_METRIC_SUBGROUP 33from ..evaluation_sets import EvaluationSetPaths, EvaluationSets 34from .evaluation_config import MetricConfig 35from .evaluation_event import EvaluationPipelineEventArgs, MetricEventArgs 36from .evaluation_event import ON_BEGIN_EVAL_PIPELINE, ON_END_EVAL_PIPELINE 37from .evaluation_event import ON_BEGIN_EVAL_METRIC, ON_END_EVAL_METRIC 38from .evaluation_event import ON_BEGIN_FILTER_RECS, ON_END_FILTER_RECS 39from .evaluation_event import ON_BEGIN_LOAD_TRAIN_SET, ON_END_LOAD_TRAIN_SET 40from .evaluation_event import ON_BEGIN_LOAD_TEST_SET, ON_END_LOAD_TEST_SET 41from .evaluation_event import ON_BEGIN_LOAD_RATING_SET, ON_END_LOAD_RATING_SET 42from .evaluation_event import ON_BEGIN_METRIC, ON_END_METRIC 43 44 45class EvaluationPipeline(CorePipeline): 46 """Evaluation Pipeline to run metric computations for ratings related to a dataset. 47 48 The pipeline is intended to be used multiple times on different computed rating files 49 that are all associated to a specific dataset. 50 Loading the evaluation sets is done each time for every metric configuration, so that 51 they can be filtered individually on subgroups before computing the actual evaluation. 52 For each metric it executes the following steps: 53 54 1) create the metric. 55 2) load the evaluation sets. 56 3) filter the evaluation sets (optional). 57 4) compute the evaluation of the metric. 58 5) store evaluations in an overview file. 59 60 Public methods: 61 62 run 63 """ 64 65 def __init__( 66 self, 67 dataset: Dataset, 68 data_filter_factory: GroupFactory, 69 metric_category_factory: GroupFactory, 70 event_dispatcher: EventDispatcher): 71 """Construct the evaluation pipeline. 72 73 Args: 74 dataset: the dataset that is associated with the evaluation of the rating sets. 75 data_filter_factory: the factory with available filters for all dataset-matrix pairs. 76 metric_category_factory: the metric category factory with available metric factories. 77 event_dispatcher: used to dispatch model/IO events when running the pipeline. 78 """ 79 CorePipeline.__init__(self, event_dispatcher) 80 self.dataset = dataset 81 self.data_filter_factory = data_filter_factory 82 self.metric_category_factory = metric_category_factory 83 84 def run(self, 85 output_path: str, 86 eval_set_paths: EvaluationSetPaths, 87 metric_config_list: List[MetricConfig], 88 is_running: Callable[[], bool], 89 **kwargs) -> None: 90 """Run the entire pipeline from beginning to end. 91 92 Effectively running all computations of the specified metrics. 93 All the specified metric configurations that have a subgroup are expected 94 to be related to the dataset that was used to construct the pipeline. 95 96 Args: 97 output_path: the path of the json file to store the output. 98 eval_set_paths: the file paths of the evaluation sets. 99 metric_config_list: list of MetricConfig objects to compute. 100 is_running: function that returns whether the pipeline 101 is still running. Stops early when False is returned. 102 103 Keyword Args: 104 reserved for future use 105 """ 106 self.event_dispatcher.dispatch(EvaluationPipelineEventArgs( 107 ON_BEGIN_EVAL_PIPELINE, 108 metric_config_list 109 )) 110 111 start = time.time() 112 113 # Create evaluations file 114 create_json( 115 output_path, 116 {'evaluations': []}, 117 self.event_dispatcher, 118 indent=4 119 ) 120 121 for metric_config in metric_config_list: 122 if not is_running(): 123 return 124 125 metric_factory = resolve_factory( 126 metric_config.name, 127 self.metric_category_factory 128 ) 129 if metric_factory is None: 130 self.event_dispatcher.dispatch(ErrorEventArgs( 131 ON_FAILURE_ERROR, 132 'Failure: to resolve metric factory for metric \'' + metric_config.name + '\'' 133 )) 134 continue 135 136 try: 137 self.run_metric( 138 output_path, 139 metric_factory, 140 eval_set_paths, 141 metric_config, 142 **kwargs 143 ) 144 except ArithmeticError: 145 self.event_dispatcher.dispatch(ErrorEventArgs( 146 ON_RAISE_ERROR, 147 'ArithmeticError: trying to run metric ' + metric_config.name 148 )) 149 continue 150 except FileNotFoundError: 151 self.event_dispatcher.dispatch(ErrorEventArgs( 152 ON_RAISE_ERROR, 153 'FileNotFoundError: trying to run metric ' + metric_config.name 154 )) 155 continue 156 except MemoryError: 157 self.event_dispatcher.dispatch(ErrorEventArgs( 158 ON_RAISE_ERROR, 159 'MemoryError: trying to run metric ' + metric_config.name 160 )) 161 continue 162 except RuntimeError: 163 self.event_dispatcher.dispatch(ErrorEventArgs( 164 ON_RAISE_ERROR, 165 'RuntimeError: trying to run metric ' + metric_config.name 166 )) 167 continue 168 169 end = time.time() 170 171 self.event_dispatcher.dispatch(EvaluationPipelineEventArgs( 172 ON_END_EVAL_PIPELINE, 173 metric_config_list 174 ), elapsed_time=end - start) 175 176 def run_metric( 177 self, 178 output_path: str, 179 metric_factory: Factory, 180 eval_set_paths: EvaluationSetPaths, 181 metric_config: MetricConfig, 182 **kwargs) -> None: 183 """Run the evaluation computation for the specified metric configuration. 184 185 Args: 186 output_path: the path of the json file to store the output. 187 metric_factory: the factory that contains the specified metric. 188 eval_set_paths: the file paths of the evaluation sets. 189 metric_config: the metric evaluation configuration. 190 191 Raises: 192 ArithmeticError: possibly raised by a metric on construction or evaluation. 193 MemoryError: possibly raised by a metric on construction or evaluation. 194 RuntimeError: possibly raised by a metric on construction or evaluation. 195 FileNotFoundError: when the file of one of the evaluation sets does not exist. 196 197 Keyword Args: 198 reserved for future use 199 """ 200 self.event_dispatcher.dispatch(MetricEventArgs( 201 ON_BEGIN_METRIC, 202 metric_config 203 )) 204 205 start = time.time() 206 207 metric = metric_factory.create( 208 metric_config.name, 209 metric_config.params, 210 **kwargs 211 ) 212 213 # this can raise a FileNotFoundError, effectively aborting the pipeline 214 eval_sets = self.load_evaluation_sets( 215 eval_set_paths, 216 metric.requires_train_set, 217 metric.requires_test_set 218 ) 219 220 eval_sets = self.filter_set_rows( 221 os.path.split(output_path)[0], 222 eval_sets, 223 metric_config.subgroup 224 ) 225 226 evaluation = self.compute_metric_evaluation( 227 metric, 228 eval_sets, 229 metric_config 230 ) 231 232 add_evaluation_to_file(output_path, evaluation, metric_config) 233 234 end = time.time() 235 236 self.event_dispatcher.dispatch(MetricEventArgs( 237 ON_END_METRIC, 238 metric_config 239 ), elapsed_time=end - start) 240 241 def load_evaluation_sets( 242 self, 243 eval_set_paths: EvaluationSetPaths, 244 train_set_required: bool, 245 test_set_required: bool) -> EvaluationSets: 246 """Load the required evaluation sets. 247 248 Args: 249 eval_set_paths: the file paths of the evaluation sets. 250 train_set_required: whether the train set is required for the evaluation. 251 test_set_required: whether the test set is required for the evaluation. 252 253 Raises: 254 FileNotFoundError: when the file of one of the evaluation sets does not exist. 255 256 Returns: 257 the loaded evaluation sets. 258 """ 259 rating_set = self.read_dataframe( 260 eval_set_paths.ratings_path, 261 'rating set', 262 ON_BEGIN_LOAD_RATING_SET, 263 ON_END_LOAD_RATING_SET, 264 ) 265 266 train_set = None if not train_set_required else self.read_dataframe( 267 eval_set_paths.train_path, 268 'train set', 269 ON_BEGIN_LOAD_TRAIN_SET, 270 ON_END_LOAD_TRAIN_SET, 271 names=['user', 'item', 'rating'] 272 ) 273 274 test_set = None if not test_set_required else self.read_dataframe( 275 eval_set_paths.test_path, 276 'test set', 277 ON_BEGIN_LOAD_TEST_SET, 278 ON_END_LOAD_TEST_SET, 279 names=['user', 'item', 'rating'] 280 ) 281 282 return EvaluationSets(rating_set, train_set, test_set) 283 284 def filter_set_rows( 285 self, 286 output_dir: str, 287 eval_sets: EvaluationSets, 288 subgroup: Optional[DataSubsetConfig]) -> EvaluationSets: 289 """Filter the evaluation set rows for the specified subgroup. 290 291 The subset is created by applying multiple filter passes to the evaluation sets 292 individually. These filter passes are then combined to form the resulting sets. 293 294 Args: 295 eval_sets: the evaluation sets to filter. 296 subgroup: the subgroup to create of the evaluation sets. 297 298 Returns: 299 the filtered evaluation sets. 300 """ 301 # Early exit, because no filtering is needed. 302 if subgroup is None or len(subgroup.filter_passes) == 0: 303 return eval_sets 304 305 self.event_dispatcher.dispatch(FilterDataframeEventArgs( 306 ON_BEGIN_FILTER_RECS, 307 subgroup 308 )) 309 310 start = time.time() 311 312 # Filter for each dataframe in eval_sets. 313 filter_factory = self.data_filter_factory 314 if eval_sets.train is not None: 315 eval_sets.train = filter_from_filter_passes( 316 self, output_dir, eval_sets.train, subgroup, filter_factory) 317 if eval_sets.test is not None: 318 eval_sets.test = filter_from_filter_passes( 319 self, output_dir, eval_sets.test, subgroup, filter_factory) 320 eval_sets.ratings = filter_from_filter_passes( 321 self, output_dir, eval_sets.ratings, subgroup, filter_factory) 322 end = time.time() 323 self.event_dispatcher.dispatch(FilterDataframeEventArgs( 324 ON_END_FILTER_RECS, 325 subgroup 326 ), elapsed_time=end - start) 327 328 return eval_sets 329 330 def compute_metric_evaluation( 331 self, 332 metric: BaseMetric, 333 eval_sets: EvaluationSets, 334 metric_config: MetricConfig) -> float: 335 """Compute the evaluation for the specified metric on the specified sets. 336 337 Args: 338 metric: the metric to use for computing the evaluation. 339 eval_sets: the evaluation sets to compute the performance of. 340 metric_config: the metric configuration that is associated with the metric. 341 342 Raises: 343 ArithmeticError: possibly raised by a metric on evaluation. 344 MemoryError: possibly raised by a metric on evaluation. 345 RuntimeError: possibly raised by a metric on evaluation. 346 347 Returns: 348 the computed evaluation of the metric. 349 """ 350 self.event_dispatcher.dispatch(MetricEventArgs( 351 ON_BEGIN_EVAL_METRIC, 352 metric_config 353 )) 354 355 start = time.time() 356 357 evaluation = metric.evaluate(eval_sets) 358 359 end = time.time() 360 361 self.event_dispatcher.dispatch(MetricEventArgs( 362 ON_END_EVAL_METRIC, 363 metric_config 364 ), elapsed_time=end - start) 365 366 return evaluation 367 368 369def add_evaluation_to_file( 370 file_path: str, 371 evaluation_value: float, 372 metric_config: MetricConfig) -> None: 373 """Add an evaluation result to the list in the overview file. 374 375 Args: 376 file_path: the path to the evaluations overview file. 377 evaluation_value: the evaluation result. 378 metric_config: the metric configuration used for the evaluation. 379 """ 380 subgroup = {} if metric_config.subgroup is None else metric_config.subgroup.to_yml_format() 381 evaluation = {KEY_NAME: metric_config.name, 382 KEY_PARAMS: metric_config.params, 383 KEY_METRIC_EVALUATION:{'value': evaluation_value, KEY_METRIC_SUBGROUP: subgroup}} 384 385 evaluations = load_json(file_path) 386 evaluations['evaluations'].append(evaluation) 387 save_json(file_path, evaluations, indent=4)
46class EvaluationPipeline(CorePipeline): 47 """Evaluation Pipeline to run metric computations for ratings related to a dataset. 48 49 The pipeline is intended to be used multiple times on different computed rating files 50 that are all associated to a specific dataset. 51 Loading the evaluation sets is done each time for every metric configuration, so that 52 they can be filtered individually on subgroups before computing the actual evaluation. 53 For each metric it executes the following steps: 54 55 1) create the metric. 56 2) load the evaluation sets. 57 3) filter the evaluation sets (optional). 58 4) compute the evaluation of the metric. 59 5) store evaluations in an overview file. 60 61 Public methods: 62 63 run 64 """ 65 66 def __init__( 67 self, 68 dataset: Dataset, 69 data_filter_factory: GroupFactory, 70 metric_category_factory: GroupFactory, 71 event_dispatcher: EventDispatcher): 72 """Construct the evaluation pipeline. 73 74 Args: 75 dataset: the dataset that is associated with the evaluation of the rating sets. 76 data_filter_factory: the factory with available filters for all dataset-matrix pairs. 77 metric_category_factory: the metric category factory with available metric factories. 78 event_dispatcher: used to dispatch model/IO events when running the pipeline. 79 """ 80 CorePipeline.__init__(self, event_dispatcher) 81 self.dataset = dataset 82 self.data_filter_factory = data_filter_factory 83 self.metric_category_factory = metric_category_factory 84 85 def run(self, 86 output_path: str, 87 eval_set_paths: EvaluationSetPaths, 88 metric_config_list: List[MetricConfig], 89 is_running: Callable[[], bool], 90 **kwargs) -> None: 91 """Run the entire pipeline from beginning to end. 92 93 Effectively running all computations of the specified metrics. 94 All the specified metric configurations that have a subgroup are expected 95 to be related to the dataset that was used to construct the pipeline. 96 97 Args: 98 output_path: the path of the json file to store the output. 99 eval_set_paths: the file paths of the evaluation sets. 100 metric_config_list: list of MetricConfig objects to compute. 101 is_running: function that returns whether the pipeline 102 is still running. Stops early when False is returned. 103 104 Keyword Args: 105 reserved for future use 106 """ 107 self.event_dispatcher.dispatch(EvaluationPipelineEventArgs( 108 ON_BEGIN_EVAL_PIPELINE, 109 metric_config_list 110 )) 111 112 start = time.time() 113 114 # Create evaluations file 115 create_json( 116 output_path, 117 {'evaluations': []}, 118 self.event_dispatcher, 119 indent=4 120 ) 121 122 for metric_config in metric_config_list: 123 if not is_running(): 124 return 125 126 metric_factory = resolve_factory( 127 metric_config.name, 128 self.metric_category_factory 129 ) 130 if metric_factory is None: 131 self.event_dispatcher.dispatch(ErrorEventArgs( 132 ON_FAILURE_ERROR, 133 'Failure: to resolve metric factory for metric \'' + metric_config.name + '\'' 134 )) 135 continue 136 137 try: 138 self.run_metric( 139 output_path, 140 metric_factory, 141 eval_set_paths, 142 metric_config, 143 **kwargs 144 ) 145 except ArithmeticError: 146 self.event_dispatcher.dispatch(ErrorEventArgs( 147 ON_RAISE_ERROR, 148 'ArithmeticError: trying to run metric ' + metric_config.name 149 )) 150 continue 151 except FileNotFoundError: 152 self.event_dispatcher.dispatch(ErrorEventArgs( 153 ON_RAISE_ERROR, 154 'FileNotFoundError: trying to run metric ' + metric_config.name 155 )) 156 continue 157 except MemoryError: 158 self.event_dispatcher.dispatch(ErrorEventArgs( 159 ON_RAISE_ERROR, 160 'MemoryError: trying to run metric ' + metric_config.name 161 )) 162 continue 163 except RuntimeError: 164 self.event_dispatcher.dispatch(ErrorEventArgs( 165 ON_RAISE_ERROR, 166 'RuntimeError: trying to run metric ' + metric_config.name 167 )) 168 continue 169 170 end = time.time() 171 172 self.event_dispatcher.dispatch(EvaluationPipelineEventArgs( 173 ON_END_EVAL_PIPELINE, 174 metric_config_list 175 ), elapsed_time=end - start) 176 177 def run_metric( 178 self, 179 output_path: str, 180 metric_factory: Factory, 181 eval_set_paths: EvaluationSetPaths, 182 metric_config: MetricConfig, 183 **kwargs) -> None: 184 """Run the evaluation computation for the specified metric configuration. 185 186 Args: 187 output_path: the path of the json file to store the output. 188 metric_factory: the factory that contains the specified metric. 189 eval_set_paths: the file paths of the evaluation sets. 190 metric_config: the metric evaluation configuration. 191 192 Raises: 193 ArithmeticError: possibly raised by a metric on construction or evaluation. 194 MemoryError: possibly raised by a metric on construction or evaluation. 195 RuntimeError: possibly raised by a metric on construction or evaluation. 196 FileNotFoundError: when the file of one of the evaluation sets does not exist. 197 198 Keyword Args: 199 reserved for future use 200 """ 201 self.event_dispatcher.dispatch(MetricEventArgs( 202 ON_BEGIN_METRIC, 203 metric_config 204 )) 205 206 start = time.time() 207 208 metric = metric_factory.create( 209 metric_config.name, 210 metric_config.params, 211 **kwargs 212 ) 213 214 # this can raise a FileNotFoundError, effectively aborting the pipeline 215 eval_sets = self.load_evaluation_sets( 216 eval_set_paths, 217 metric.requires_train_set, 218 metric.requires_test_set 219 ) 220 221 eval_sets = self.filter_set_rows( 222 os.path.split(output_path)[0], 223 eval_sets, 224 metric_config.subgroup 225 ) 226 227 evaluation = self.compute_metric_evaluation( 228 metric, 229 eval_sets, 230 metric_config 231 ) 232 233 add_evaluation_to_file(output_path, evaluation, metric_config) 234 235 end = time.time() 236 237 self.event_dispatcher.dispatch(MetricEventArgs( 238 ON_END_METRIC, 239 metric_config 240 ), elapsed_time=end - start) 241 242 def load_evaluation_sets( 243 self, 244 eval_set_paths: EvaluationSetPaths, 245 train_set_required: bool, 246 test_set_required: bool) -> EvaluationSets: 247 """Load the required evaluation sets. 248 249 Args: 250 eval_set_paths: the file paths of the evaluation sets. 251 train_set_required: whether the train set is required for the evaluation. 252 test_set_required: whether the test set is required for the evaluation. 253 254 Raises: 255 FileNotFoundError: when the file of one of the evaluation sets does not exist. 256 257 Returns: 258 the loaded evaluation sets. 259 """ 260 rating_set = self.read_dataframe( 261 eval_set_paths.ratings_path, 262 'rating set', 263 ON_BEGIN_LOAD_RATING_SET, 264 ON_END_LOAD_RATING_SET, 265 ) 266 267 train_set = None if not train_set_required else self.read_dataframe( 268 eval_set_paths.train_path, 269 'train set', 270 ON_BEGIN_LOAD_TRAIN_SET, 271 ON_END_LOAD_TRAIN_SET, 272 names=['user', 'item', 'rating'] 273 ) 274 275 test_set = None if not test_set_required else self.read_dataframe( 276 eval_set_paths.test_path, 277 'test set', 278 ON_BEGIN_LOAD_TEST_SET, 279 ON_END_LOAD_TEST_SET, 280 names=['user', 'item', 'rating'] 281 ) 282 283 return EvaluationSets(rating_set, train_set, test_set) 284 285 def filter_set_rows( 286 self, 287 output_dir: str, 288 eval_sets: EvaluationSets, 289 subgroup: Optional[DataSubsetConfig]) -> EvaluationSets: 290 """Filter the evaluation set rows for the specified subgroup. 291 292 The subset is created by applying multiple filter passes to the evaluation sets 293 individually. These filter passes are then combined to form the resulting sets. 294 295 Args: 296 eval_sets: the evaluation sets to filter. 297 subgroup: the subgroup to create of the evaluation sets. 298 299 Returns: 300 the filtered evaluation sets. 301 """ 302 # Early exit, because no filtering is needed. 303 if subgroup is None or len(subgroup.filter_passes) == 0: 304 return eval_sets 305 306 self.event_dispatcher.dispatch(FilterDataframeEventArgs( 307 ON_BEGIN_FILTER_RECS, 308 subgroup 309 )) 310 311 start = time.time() 312 313 # Filter for each dataframe in eval_sets. 314 filter_factory = self.data_filter_factory 315 if eval_sets.train is not None: 316 eval_sets.train = filter_from_filter_passes( 317 self, output_dir, eval_sets.train, subgroup, filter_factory) 318 if eval_sets.test is not None: 319 eval_sets.test = filter_from_filter_passes( 320 self, output_dir, eval_sets.test, subgroup, filter_factory) 321 eval_sets.ratings = filter_from_filter_passes( 322 self, output_dir, eval_sets.ratings, subgroup, filter_factory) 323 end = time.time() 324 self.event_dispatcher.dispatch(FilterDataframeEventArgs( 325 ON_END_FILTER_RECS, 326 subgroup 327 ), elapsed_time=end - start) 328 329 return eval_sets 330 331 def compute_metric_evaluation( 332 self, 333 metric: BaseMetric, 334 eval_sets: EvaluationSets, 335 metric_config: MetricConfig) -> float: 336 """Compute the evaluation for the specified metric on the specified sets. 337 338 Args: 339 metric: the metric to use for computing the evaluation. 340 eval_sets: the evaluation sets to compute the performance of. 341 metric_config: the metric configuration that is associated with the metric. 342 343 Raises: 344 ArithmeticError: possibly raised by a metric on evaluation. 345 MemoryError: possibly raised by a metric on evaluation. 346 RuntimeError: possibly raised by a metric on evaluation. 347 348 Returns: 349 the computed evaluation of the metric. 350 """ 351 self.event_dispatcher.dispatch(MetricEventArgs( 352 ON_BEGIN_EVAL_METRIC, 353 metric_config 354 )) 355 356 start = time.time() 357 358 evaluation = metric.evaluate(eval_sets) 359 360 end = time.time() 361 362 self.event_dispatcher.dispatch(MetricEventArgs( 363 ON_END_EVAL_METRIC, 364 metric_config 365 ), elapsed_time=end - start) 366 367 return evaluation
Evaluation Pipeline to run metric computations for ratings related to a dataset.
The pipeline is intended to be used multiple times on different computed rating files that are all associated to a specific dataset. Loading the evaluation sets is done each time for every metric configuration, so that they can be filtered individually on subgroups before computing the actual evaluation. For each metric it executes the following steps:
1) create the metric. 2) load the evaluation sets. 3) filter the evaluation sets (optional). 4) compute the evaluation of the metric. 5) store evaluations in an overview file.
Public methods:
run
66 def __init__( 67 self, 68 dataset: Dataset, 69 data_filter_factory: GroupFactory, 70 metric_category_factory: GroupFactory, 71 event_dispatcher: EventDispatcher): 72 """Construct the evaluation pipeline. 73 74 Args: 75 dataset: the dataset that is associated with the evaluation of the rating sets. 76 data_filter_factory: the factory with available filters for all dataset-matrix pairs. 77 metric_category_factory: the metric category factory with available metric factories. 78 event_dispatcher: used to dispatch model/IO events when running the pipeline. 79 """ 80 CorePipeline.__init__(self, event_dispatcher) 81 self.dataset = dataset 82 self.data_filter_factory = data_filter_factory 83 self.metric_category_factory = metric_category_factory
Construct the evaluation pipeline.
Args: dataset: the dataset that is associated with the evaluation of the rating sets. data_filter_factory: the factory with available filters for all dataset-matrix pairs. metric_category_factory: the metric category factory with available metric factories. event_dispatcher: used to dispatch model/IO events when running the pipeline.
85 def run(self, 86 output_path: str, 87 eval_set_paths: EvaluationSetPaths, 88 metric_config_list: List[MetricConfig], 89 is_running: Callable[[], bool], 90 **kwargs) -> None: 91 """Run the entire pipeline from beginning to end. 92 93 Effectively running all computations of the specified metrics. 94 All the specified metric configurations that have a subgroup are expected 95 to be related to the dataset that was used to construct the pipeline. 96 97 Args: 98 output_path: the path of the json file to store the output. 99 eval_set_paths: the file paths of the evaluation sets. 100 metric_config_list: list of MetricConfig objects to compute. 101 is_running: function that returns whether the pipeline 102 is still running. Stops early when False is returned. 103 104 Keyword Args: 105 reserved for future use 106 """ 107 self.event_dispatcher.dispatch(EvaluationPipelineEventArgs( 108 ON_BEGIN_EVAL_PIPELINE, 109 metric_config_list 110 )) 111 112 start = time.time() 113 114 # Create evaluations file 115 create_json( 116 output_path, 117 {'evaluations': []}, 118 self.event_dispatcher, 119 indent=4 120 ) 121 122 for metric_config in metric_config_list: 123 if not is_running(): 124 return 125 126 metric_factory = resolve_factory( 127 metric_config.name, 128 self.metric_category_factory 129 ) 130 if metric_factory is None: 131 self.event_dispatcher.dispatch(ErrorEventArgs( 132 ON_FAILURE_ERROR, 133 'Failure: to resolve metric factory for metric \'' + metric_config.name + '\'' 134 )) 135 continue 136 137 try: 138 self.run_metric( 139 output_path, 140 metric_factory, 141 eval_set_paths, 142 metric_config, 143 **kwargs 144 ) 145 except ArithmeticError: 146 self.event_dispatcher.dispatch(ErrorEventArgs( 147 ON_RAISE_ERROR, 148 'ArithmeticError: trying to run metric ' + metric_config.name 149 )) 150 continue 151 except FileNotFoundError: 152 self.event_dispatcher.dispatch(ErrorEventArgs( 153 ON_RAISE_ERROR, 154 'FileNotFoundError: trying to run metric ' + metric_config.name 155 )) 156 continue 157 except MemoryError: 158 self.event_dispatcher.dispatch(ErrorEventArgs( 159 ON_RAISE_ERROR, 160 'MemoryError: trying to run metric ' + metric_config.name 161 )) 162 continue 163 except RuntimeError: 164 self.event_dispatcher.dispatch(ErrorEventArgs( 165 ON_RAISE_ERROR, 166 'RuntimeError: trying to run metric ' + metric_config.name 167 )) 168 continue 169 170 end = time.time() 171 172 self.event_dispatcher.dispatch(EvaluationPipelineEventArgs( 173 ON_END_EVAL_PIPELINE, 174 metric_config_list 175 ), elapsed_time=end - start)
Run the entire pipeline from beginning to end.
Effectively running all computations of the specified metrics. All the specified metric configurations that have a subgroup are expected to be related to the dataset that was used to construct the pipeline.
Args: output_path: the path of the json file to store the output. eval_set_paths: the file paths of the evaluation sets. metric_config_list: list of MetricConfig objects to compute. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.
Keyword Args: reserved for future use
177 def run_metric( 178 self, 179 output_path: str, 180 metric_factory: Factory, 181 eval_set_paths: EvaluationSetPaths, 182 metric_config: MetricConfig, 183 **kwargs) -> None: 184 """Run the evaluation computation for the specified metric configuration. 185 186 Args: 187 output_path: the path of the json file to store the output. 188 metric_factory: the factory that contains the specified metric. 189 eval_set_paths: the file paths of the evaluation sets. 190 metric_config: the metric evaluation configuration. 191 192 Raises: 193 ArithmeticError: possibly raised by a metric on construction or evaluation. 194 MemoryError: possibly raised by a metric on construction or evaluation. 195 RuntimeError: possibly raised by a metric on construction or evaluation. 196 FileNotFoundError: when the file of one of the evaluation sets does not exist. 197 198 Keyword Args: 199 reserved for future use 200 """ 201 self.event_dispatcher.dispatch(MetricEventArgs( 202 ON_BEGIN_METRIC, 203 metric_config 204 )) 205 206 start = time.time() 207 208 metric = metric_factory.create( 209 metric_config.name, 210 metric_config.params, 211 **kwargs 212 ) 213 214 # this can raise a FileNotFoundError, effectively aborting the pipeline 215 eval_sets = self.load_evaluation_sets( 216 eval_set_paths, 217 metric.requires_train_set, 218 metric.requires_test_set 219 ) 220 221 eval_sets = self.filter_set_rows( 222 os.path.split(output_path)[0], 223 eval_sets, 224 metric_config.subgroup 225 ) 226 227 evaluation = self.compute_metric_evaluation( 228 metric, 229 eval_sets, 230 metric_config 231 ) 232 233 add_evaluation_to_file(output_path, evaluation, metric_config) 234 235 end = time.time() 236 237 self.event_dispatcher.dispatch(MetricEventArgs( 238 ON_END_METRIC, 239 metric_config 240 ), elapsed_time=end - start)
Run the evaluation computation for the specified metric configuration.
Args: output_path: the path of the json file to store the output. metric_factory: the factory that contains the specified metric. eval_set_paths: the file paths of the evaluation sets. metric_config: the metric evaluation configuration.
Raises: ArithmeticError: possibly raised by a metric on construction or evaluation. MemoryError: possibly raised by a metric on construction or evaluation. RuntimeError: possibly raised by a metric on construction or evaluation. FileNotFoundError: when the file of one of the evaluation sets does not exist.
Keyword Args: reserved for future use
242 def load_evaluation_sets( 243 self, 244 eval_set_paths: EvaluationSetPaths, 245 train_set_required: bool, 246 test_set_required: bool) -> EvaluationSets: 247 """Load the required evaluation sets. 248 249 Args: 250 eval_set_paths: the file paths of the evaluation sets. 251 train_set_required: whether the train set is required for the evaluation. 252 test_set_required: whether the test set is required for the evaluation. 253 254 Raises: 255 FileNotFoundError: when the file of one of the evaluation sets does not exist. 256 257 Returns: 258 the loaded evaluation sets. 259 """ 260 rating_set = self.read_dataframe( 261 eval_set_paths.ratings_path, 262 'rating set', 263 ON_BEGIN_LOAD_RATING_SET, 264 ON_END_LOAD_RATING_SET, 265 ) 266 267 train_set = None if not train_set_required else self.read_dataframe( 268 eval_set_paths.train_path, 269 'train set', 270 ON_BEGIN_LOAD_TRAIN_SET, 271 ON_END_LOAD_TRAIN_SET, 272 names=['user', 'item', 'rating'] 273 ) 274 275 test_set = None if not test_set_required else self.read_dataframe( 276 eval_set_paths.test_path, 277 'test set', 278 ON_BEGIN_LOAD_TEST_SET, 279 ON_END_LOAD_TEST_SET, 280 names=['user', 'item', 'rating'] 281 ) 282 283 return EvaluationSets(rating_set, train_set, test_set)
Load the required evaluation sets.
Args: eval_set_paths: the file paths of the evaluation sets. train_set_required: whether the train set is required for the evaluation. test_set_required: whether the test set is required for the evaluation.
Raises: FileNotFoundError: when the file of one of the evaluation sets does not exist.
Returns: the loaded evaluation sets.
285 def filter_set_rows( 286 self, 287 output_dir: str, 288 eval_sets: EvaluationSets, 289 subgroup: Optional[DataSubsetConfig]) -> EvaluationSets: 290 """Filter the evaluation set rows for the specified subgroup. 291 292 The subset is created by applying multiple filter passes to the evaluation sets 293 individually. These filter passes are then combined to form the resulting sets. 294 295 Args: 296 eval_sets: the evaluation sets to filter. 297 subgroup: the subgroup to create of the evaluation sets. 298 299 Returns: 300 the filtered evaluation sets. 301 """ 302 # Early exit, because no filtering is needed. 303 if subgroup is None or len(subgroup.filter_passes) == 0: 304 return eval_sets 305 306 self.event_dispatcher.dispatch(FilterDataframeEventArgs( 307 ON_BEGIN_FILTER_RECS, 308 subgroup 309 )) 310 311 start = time.time() 312 313 # Filter for each dataframe in eval_sets. 314 filter_factory = self.data_filter_factory 315 if eval_sets.train is not None: 316 eval_sets.train = filter_from_filter_passes( 317 self, output_dir, eval_sets.train, subgroup, filter_factory) 318 if eval_sets.test is not None: 319 eval_sets.test = filter_from_filter_passes( 320 self, output_dir, eval_sets.test, subgroup, filter_factory) 321 eval_sets.ratings = filter_from_filter_passes( 322 self, output_dir, eval_sets.ratings, subgroup, filter_factory) 323 end = time.time() 324 self.event_dispatcher.dispatch(FilterDataframeEventArgs( 325 ON_END_FILTER_RECS, 326 subgroup 327 ), elapsed_time=end - start) 328 329 return eval_sets
Filter the evaluation set rows for the specified subgroup.
The subset is created by applying multiple filter passes to the evaluation sets individually. These filter passes are then combined to form the resulting sets.
Args: eval_sets: the evaluation sets to filter. subgroup: the subgroup to create of the evaluation sets.
Returns: the filtered evaluation sets.
331 def compute_metric_evaluation( 332 self, 333 metric: BaseMetric, 334 eval_sets: EvaluationSets, 335 metric_config: MetricConfig) -> float: 336 """Compute the evaluation for the specified metric on the specified sets. 337 338 Args: 339 metric: the metric to use for computing the evaluation. 340 eval_sets: the evaluation sets to compute the performance of. 341 metric_config: the metric configuration that is associated with the metric. 342 343 Raises: 344 ArithmeticError: possibly raised by a metric on evaluation. 345 MemoryError: possibly raised by a metric on evaluation. 346 RuntimeError: possibly raised by a metric on evaluation. 347 348 Returns: 349 the computed evaluation of the metric. 350 """ 351 self.event_dispatcher.dispatch(MetricEventArgs( 352 ON_BEGIN_EVAL_METRIC, 353 metric_config 354 )) 355 356 start = time.time() 357 358 evaluation = metric.evaluate(eval_sets) 359 360 end = time.time() 361 362 self.event_dispatcher.dispatch(MetricEventArgs( 363 ON_END_EVAL_METRIC, 364 metric_config 365 ), elapsed_time=end - start) 366 367 return evaluation
Compute the evaluation for the specified metric on the specified sets.
Args: metric: the metric to use for computing the evaluation. eval_sets: the evaluation sets to compute the performance of. metric_config: the metric configuration that is associated with the metric.
Raises: ArithmeticError: possibly raised by a metric on evaluation. MemoryError: possibly raised by a metric on evaluation. RuntimeError: possibly raised by a metric on evaluation.
Returns: the computed evaluation of the metric.
370def add_evaluation_to_file( 371 file_path: str, 372 evaluation_value: float, 373 metric_config: MetricConfig) -> None: 374 """Add an evaluation result to the list in the overview file. 375 376 Args: 377 file_path: the path to the evaluations overview file. 378 evaluation_value: the evaluation result. 379 metric_config: the metric configuration used for the evaluation. 380 """ 381 subgroup = {} if metric_config.subgroup is None else metric_config.subgroup.to_yml_format() 382 evaluation = {KEY_NAME: metric_config.name, 383 KEY_PARAMS: metric_config.params, 384 KEY_METRIC_EVALUATION:{'value': evaluation_value, KEY_METRIC_SUBGROUP: subgroup}} 385 386 evaluations = load_json(file_path) 387 evaluations['evaluations'].append(evaluation) 388 save_json(file_path, evaluations, indent=4)
Add an evaluation result to the list in the overview file.
Args: file_path: the path to the evaluations overview file. evaluation_value: the evaluation result. metric_config: the metric configuration used for the evaluation.