src.fairreckitlib.evaluation.pipeline.evaluation_pipeline

This module contains base functionality of the complete evaluation pipeline.

Classes:

EvaluationPipeline: class that runs multiple metric computations for a specific evaluation set.

Functions:

add_evaluation_to_file: append computed evaluation to a result file.

This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)

  1"""This module contains base functionality of the complete evaluation pipeline.
  2
  3Classes:
  4
  5    EvaluationPipeline: class that runs multiple metric computations for a specific evaluation set.
  6
  7Functions:
  8
  9    add_evaluation_to_file: append computed evaluation to a result file.
 10
 11This program has been developed by students from the bachelor Computer Science at
 12Utrecht University within the Software Project course.
 13© Copyright Utrecht University (Department of Information and Computing Sciences)
 14"""
 15
 16import os
 17import time
 18from typing import Callable, List, Optional
 19
 20from ...core.config.config_factories import Factory, GroupFactory, resolve_factory
 21from ...core.core_constants import KEY_NAME, KEY_PARAMS
 22from ...core.events.event_dispatcher import EventDispatcher
 23from ...core.events.event_error import ON_FAILURE_ERROR, ON_RAISE_ERROR, ErrorEventArgs
 24from ...core.io.io_create import create_json
 25from ...core.io.io_utility import load_json, save_json
 26from ...core.pipeline.core_pipeline import CorePipeline
 27from ...data.filter.filter_config import DataSubsetConfig
 28from ...data.filter.filter_event import FilterDataframeEventArgs
 29from ...data.filter.filter_passes import filter_from_filter_passes
 30from ...data.set.dataset import Dataset
 31from ..metrics.metric_base import BaseMetric
 32from ..metrics.metric_constants import KEY_METRIC_EVALUATION, KEY_METRIC_SUBGROUP
 33from ..evaluation_sets import EvaluationSetPaths, EvaluationSets
 34from .evaluation_config import MetricConfig
 35from .evaluation_event import EvaluationPipelineEventArgs, MetricEventArgs
 36from .evaluation_event import ON_BEGIN_EVAL_PIPELINE, ON_END_EVAL_PIPELINE
 37from .evaluation_event import ON_BEGIN_EVAL_METRIC, ON_END_EVAL_METRIC
 38from .evaluation_event import ON_BEGIN_FILTER_RECS, ON_END_FILTER_RECS
 39from .evaluation_event import ON_BEGIN_LOAD_TRAIN_SET, ON_END_LOAD_TRAIN_SET
 40from .evaluation_event import ON_BEGIN_LOAD_TEST_SET, ON_END_LOAD_TEST_SET
 41from .evaluation_event import ON_BEGIN_LOAD_RATING_SET, ON_END_LOAD_RATING_SET
 42from .evaluation_event import ON_BEGIN_METRIC, ON_END_METRIC
 43
 44
 45class EvaluationPipeline(CorePipeline):
 46    """Evaluation Pipeline to run metric computations for ratings related to a dataset.
 47
 48    The pipeline is intended to be used multiple times on different computed rating files
 49    that are all associated to a specific dataset.
 50    Loading the evaluation sets is done each time for every metric configuration, so that
 51    they can be filtered individually on subgroups before computing the actual evaluation.
 52    For each metric it executes the following steps:
 53
 54    1) create the metric.
 55    2) load the evaluation sets.
 56    3) filter the evaluation sets (optional).
 57    4) compute the evaluation of the metric.
 58    5) store evaluations in an overview file.
 59
 60    Public methods:
 61
 62    run
 63    """
 64
 65    def __init__(
 66            self,
 67            dataset: Dataset,
 68            data_filter_factory: GroupFactory,
 69            metric_category_factory: GroupFactory,
 70            event_dispatcher: EventDispatcher):
 71        """Construct the evaluation pipeline.
 72
 73        Args:
 74            dataset: the dataset that is associated with the evaluation of the rating sets.
 75            data_filter_factory: the factory with available filters for all dataset-matrix pairs.
 76            metric_category_factory: the metric category factory with available metric factories.
 77            event_dispatcher: used to dispatch model/IO events when running the pipeline.
 78        """
 79        CorePipeline.__init__(self, event_dispatcher)
 80        self.dataset = dataset
 81        self.data_filter_factory = data_filter_factory
 82        self.metric_category_factory = metric_category_factory
 83
 84    def run(self,
 85            output_path: str,
 86            eval_set_paths: EvaluationSetPaths,
 87            metric_config_list: List[MetricConfig],
 88            is_running: Callable[[], bool],
 89            **kwargs) -> None:
 90        """Run the entire pipeline from beginning to end.
 91
 92        Effectively running all computations of the specified metrics.
 93        All the specified metric configurations that have a subgroup are expected
 94        to be related to the dataset that was used to construct the pipeline.
 95
 96        Args:
 97            output_path: the path of the json file to store the output.
 98            eval_set_paths: the file paths of the evaluation sets.
 99            metric_config_list: list of MetricConfig objects to compute.
100            is_running: function that returns whether the pipeline
101                is still running. Stops early when False is returned.
102
103        Keyword Args:
104            reserved for future use
105        """
106        self.event_dispatcher.dispatch(EvaluationPipelineEventArgs(
107            ON_BEGIN_EVAL_PIPELINE,
108            metric_config_list
109        ))
110
111        start = time.time()
112
113        # Create evaluations file
114        create_json(
115            output_path,
116            {'evaluations': []},
117            self.event_dispatcher,
118            indent=4
119        )
120
121        for metric_config in metric_config_list:
122            if not is_running():
123                return
124
125            metric_factory = resolve_factory(
126                metric_config.name,
127                self.metric_category_factory
128            )
129            if metric_factory is None:
130                self.event_dispatcher.dispatch(ErrorEventArgs(
131                    ON_FAILURE_ERROR,
132                    'Failure: to resolve metric factory for metric \'' + metric_config.name + '\''
133                ))
134                continue
135
136            try:
137                self.run_metric(
138                    output_path,
139                    metric_factory,
140                    eval_set_paths,
141                    metric_config,
142                    **kwargs
143                )
144            except ArithmeticError:
145                self.event_dispatcher.dispatch(ErrorEventArgs(
146                    ON_RAISE_ERROR,
147                    'ArithmeticError: trying to run metric ' + metric_config.name
148                ))
149                continue
150            except FileNotFoundError:
151                self.event_dispatcher.dispatch(ErrorEventArgs(
152                    ON_RAISE_ERROR,
153                    'FileNotFoundError: trying to run metric ' + metric_config.name
154                ))
155                continue
156            except MemoryError:
157                self.event_dispatcher.dispatch(ErrorEventArgs(
158                    ON_RAISE_ERROR,
159                    'MemoryError: trying to run metric ' + metric_config.name
160                ))
161                continue
162            except RuntimeError:
163                self.event_dispatcher.dispatch(ErrorEventArgs(
164                    ON_RAISE_ERROR,
165                    'RuntimeError: trying to run metric ' + metric_config.name
166                ))
167                continue
168
169        end = time.time()
170
171        self.event_dispatcher.dispatch(EvaluationPipelineEventArgs(
172            ON_END_EVAL_PIPELINE,
173            metric_config_list
174        ), elapsed_time=end - start)
175
176    def run_metric(
177            self,
178            output_path: str,
179            metric_factory: Factory,
180            eval_set_paths: EvaluationSetPaths,
181            metric_config: MetricConfig,
182            **kwargs) -> None:
183        """Run the evaluation computation for the specified metric configuration.
184
185        Args:
186            output_path: the path of the json file to store the output.
187            metric_factory: the factory that contains the specified metric.
188            eval_set_paths: the file paths of the evaluation sets.
189            metric_config: the metric evaluation configuration.
190
191        Raises:
192            ArithmeticError: possibly raised by a metric on construction or evaluation.
193            MemoryError: possibly raised by a metric on construction or evaluation.
194            RuntimeError: possibly raised by a metric on construction or evaluation.
195            FileNotFoundError: when the file of one of the evaluation sets does not exist.
196
197        Keyword Args:
198            reserved for future use
199        """
200        self.event_dispatcher.dispatch(MetricEventArgs(
201            ON_BEGIN_METRIC,
202            metric_config
203        ))
204
205        start = time.time()
206
207        metric = metric_factory.create(
208            metric_config.name,
209            metric_config.params,
210            **kwargs
211        )
212
213        # this can raise a FileNotFoundError, effectively aborting the pipeline
214        eval_sets = self.load_evaluation_sets(
215            eval_set_paths,
216            metric.requires_train_set,
217            metric.requires_test_set
218        )
219
220        eval_sets = self.filter_set_rows(
221            os.path.split(output_path)[0],
222            eval_sets,
223            metric_config.subgroup
224        )
225
226        evaluation = self.compute_metric_evaluation(
227            metric,
228            eval_sets,
229            metric_config
230        )
231
232        add_evaluation_to_file(output_path, evaluation, metric_config)
233
234        end = time.time()
235
236        self.event_dispatcher.dispatch(MetricEventArgs(
237            ON_END_METRIC,
238            metric_config
239        ), elapsed_time=end - start)
240
241    def load_evaluation_sets(
242            self,
243            eval_set_paths: EvaluationSetPaths,
244            train_set_required: bool,
245            test_set_required: bool) -> EvaluationSets:
246        """Load the required evaluation sets.
247
248        Args:
249            eval_set_paths: the file paths of the evaluation sets.
250            train_set_required: whether the train set is required for the evaluation.
251            test_set_required: whether the test set is required for the evaluation.
252
253        Raises:
254            FileNotFoundError: when the file of one of the evaluation sets does not exist.
255
256        Returns:
257            the loaded evaluation sets.
258        """
259        rating_set = self.read_dataframe(
260            eval_set_paths.ratings_path,
261            'rating set',
262            ON_BEGIN_LOAD_RATING_SET,
263            ON_END_LOAD_RATING_SET,
264        )
265
266        train_set = None if not train_set_required else self.read_dataframe(
267            eval_set_paths.train_path,
268            'train set',
269            ON_BEGIN_LOAD_TRAIN_SET,
270            ON_END_LOAD_TRAIN_SET,
271            names=['user', 'item', 'rating']
272        )
273
274        test_set = None if not test_set_required else self.read_dataframe(
275            eval_set_paths.test_path,
276            'test set',
277            ON_BEGIN_LOAD_TEST_SET,
278            ON_END_LOAD_TEST_SET,
279            names=['user', 'item', 'rating']
280        )
281
282        return EvaluationSets(rating_set, train_set, test_set)
283
284    def filter_set_rows(
285            self,
286            output_dir: str,
287            eval_sets: EvaluationSets,
288            subgroup: Optional[DataSubsetConfig]) -> EvaluationSets:
289        """Filter the evaluation set rows for the specified subgroup.
290
291        The subset is created by applying multiple filter passes to the evaluation sets
292        individually. These filter passes are then combined to form the resulting sets.
293
294        Args:
295            eval_sets: the evaluation sets to filter.
296            subgroup: the subgroup to create of the evaluation sets.
297
298        Returns:
299            the filtered evaluation sets.
300        """
301        # Early exit, because no filtering is needed.
302        if subgroup is None or len(subgroup.filter_passes) == 0:
303            return eval_sets
304
305        self.event_dispatcher.dispatch(FilterDataframeEventArgs(
306            ON_BEGIN_FILTER_RECS,
307            subgroup
308        ))
309
310        start = time.time()
311
312        # Filter for each dataframe in eval_sets.
313        filter_factory = self.data_filter_factory
314        if eval_sets.train is not None:
315            eval_sets.train = filter_from_filter_passes(
316                self, output_dir, eval_sets.train, subgroup, filter_factory)
317        if eval_sets.test is not None:
318            eval_sets.test = filter_from_filter_passes(
319                self, output_dir, eval_sets.test, subgroup, filter_factory)
320        eval_sets.ratings = filter_from_filter_passes(
321                self, output_dir, eval_sets.ratings, subgroup, filter_factory)
322        end = time.time()
323        self.event_dispatcher.dispatch(FilterDataframeEventArgs(
324            ON_END_FILTER_RECS,
325            subgroup
326        ), elapsed_time=end - start)
327
328        return eval_sets
329
330    def compute_metric_evaluation(
331            self,
332            metric: BaseMetric,
333            eval_sets: EvaluationSets,
334            metric_config: MetricConfig) -> float:
335        """Compute the evaluation for the specified metric on the specified sets.
336
337        Args:
338            metric: the metric to use for computing the evaluation.
339            eval_sets: the evaluation sets to compute the performance of.
340            metric_config: the metric configuration that is associated with the metric.
341
342        Raises:
343            ArithmeticError: possibly raised by a metric on evaluation.
344            MemoryError: possibly raised by a metric on evaluation.
345            RuntimeError: possibly raised by a metric on evaluation.
346
347        Returns:
348            the computed evaluation of the metric.
349        """
350        self.event_dispatcher.dispatch(MetricEventArgs(
351            ON_BEGIN_EVAL_METRIC,
352            metric_config
353        ))
354
355        start = time.time()
356
357        evaluation = metric.evaluate(eval_sets)
358
359        end = time.time()
360
361        self.event_dispatcher.dispatch(MetricEventArgs(
362            ON_END_EVAL_METRIC,
363            metric_config
364        ), elapsed_time=end - start)
365
366        return evaluation
367
368
369def add_evaluation_to_file(
370        file_path: str,
371        evaluation_value: float,
372        metric_config: MetricConfig) -> None:
373    """Add an evaluation result to the list in the overview file.
374
375    Args:
376        file_path: the path to the evaluations overview file.
377        evaluation_value: the evaluation result.
378        metric_config: the metric configuration used for the evaluation.
379    """
380    subgroup = {} if metric_config.subgroup is None else metric_config.subgroup.to_yml_format()
381    evaluation = {KEY_NAME: metric_config.name,
382                  KEY_PARAMS: metric_config.params,
383                  KEY_METRIC_EVALUATION:{'value': evaluation_value, KEY_METRIC_SUBGROUP: subgroup}}
384
385    evaluations = load_json(file_path)
386    evaluations['evaluations'].append(evaluation)
387    save_json(file_path, evaluations, indent=4)
class EvaluationPipeline(src.fairreckitlib.core.pipeline.core_pipeline.CorePipeline):
 46class EvaluationPipeline(CorePipeline):
 47    """Evaluation Pipeline to run metric computations for ratings related to a dataset.
 48
 49    The pipeline is intended to be used multiple times on different computed rating files
 50    that are all associated to a specific dataset.
 51    Loading the evaluation sets is done each time for every metric configuration, so that
 52    they can be filtered individually on subgroups before computing the actual evaluation.
 53    For each metric it executes the following steps:
 54
 55    1) create the metric.
 56    2) load the evaluation sets.
 57    3) filter the evaluation sets (optional).
 58    4) compute the evaluation of the metric.
 59    5) store evaluations in an overview file.
 60
 61    Public methods:
 62
 63    run
 64    """
 65
 66    def __init__(
 67            self,
 68            dataset: Dataset,
 69            data_filter_factory: GroupFactory,
 70            metric_category_factory: GroupFactory,
 71            event_dispatcher: EventDispatcher):
 72        """Construct the evaluation pipeline.
 73
 74        Args:
 75            dataset: the dataset that is associated with the evaluation of the rating sets.
 76            data_filter_factory: the factory with available filters for all dataset-matrix pairs.
 77            metric_category_factory: the metric category factory with available metric factories.
 78            event_dispatcher: used to dispatch model/IO events when running the pipeline.
 79        """
 80        CorePipeline.__init__(self, event_dispatcher)
 81        self.dataset = dataset
 82        self.data_filter_factory = data_filter_factory
 83        self.metric_category_factory = metric_category_factory
 84
 85    def run(self,
 86            output_path: str,
 87            eval_set_paths: EvaluationSetPaths,
 88            metric_config_list: List[MetricConfig],
 89            is_running: Callable[[], bool],
 90            **kwargs) -> None:
 91        """Run the entire pipeline from beginning to end.
 92
 93        Effectively running all computations of the specified metrics.
 94        All the specified metric configurations that have a subgroup are expected
 95        to be related to the dataset that was used to construct the pipeline.
 96
 97        Args:
 98            output_path: the path of the json file to store the output.
 99            eval_set_paths: the file paths of the evaluation sets.
100            metric_config_list: list of MetricConfig objects to compute.
101            is_running: function that returns whether the pipeline
102                is still running. Stops early when False is returned.
103
104        Keyword Args:
105            reserved for future use
106        """
107        self.event_dispatcher.dispatch(EvaluationPipelineEventArgs(
108            ON_BEGIN_EVAL_PIPELINE,
109            metric_config_list
110        ))
111
112        start = time.time()
113
114        # Create evaluations file
115        create_json(
116            output_path,
117            {'evaluations': []},
118            self.event_dispatcher,
119            indent=4
120        )
121
122        for metric_config in metric_config_list:
123            if not is_running():
124                return
125
126            metric_factory = resolve_factory(
127                metric_config.name,
128                self.metric_category_factory
129            )
130            if metric_factory is None:
131                self.event_dispatcher.dispatch(ErrorEventArgs(
132                    ON_FAILURE_ERROR,
133                    'Failure: to resolve metric factory for metric \'' + metric_config.name + '\''
134                ))
135                continue
136
137            try:
138                self.run_metric(
139                    output_path,
140                    metric_factory,
141                    eval_set_paths,
142                    metric_config,
143                    **kwargs
144                )
145            except ArithmeticError:
146                self.event_dispatcher.dispatch(ErrorEventArgs(
147                    ON_RAISE_ERROR,
148                    'ArithmeticError: trying to run metric ' + metric_config.name
149                ))
150                continue
151            except FileNotFoundError:
152                self.event_dispatcher.dispatch(ErrorEventArgs(
153                    ON_RAISE_ERROR,
154                    'FileNotFoundError: trying to run metric ' + metric_config.name
155                ))
156                continue
157            except MemoryError:
158                self.event_dispatcher.dispatch(ErrorEventArgs(
159                    ON_RAISE_ERROR,
160                    'MemoryError: trying to run metric ' + metric_config.name
161                ))
162                continue
163            except RuntimeError:
164                self.event_dispatcher.dispatch(ErrorEventArgs(
165                    ON_RAISE_ERROR,
166                    'RuntimeError: trying to run metric ' + metric_config.name
167                ))
168                continue
169
170        end = time.time()
171
172        self.event_dispatcher.dispatch(EvaluationPipelineEventArgs(
173            ON_END_EVAL_PIPELINE,
174            metric_config_list
175        ), elapsed_time=end - start)
176
177    def run_metric(
178            self,
179            output_path: str,
180            metric_factory: Factory,
181            eval_set_paths: EvaluationSetPaths,
182            metric_config: MetricConfig,
183            **kwargs) -> None:
184        """Run the evaluation computation for the specified metric configuration.
185
186        Args:
187            output_path: the path of the json file to store the output.
188            metric_factory: the factory that contains the specified metric.
189            eval_set_paths: the file paths of the evaluation sets.
190            metric_config: the metric evaluation configuration.
191
192        Raises:
193            ArithmeticError: possibly raised by a metric on construction or evaluation.
194            MemoryError: possibly raised by a metric on construction or evaluation.
195            RuntimeError: possibly raised by a metric on construction or evaluation.
196            FileNotFoundError: when the file of one of the evaluation sets does not exist.
197
198        Keyword Args:
199            reserved for future use
200        """
201        self.event_dispatcher.dispatch(MetricEventArgs(
202            ON_BEGIN_METRIC,
203            metric_config
204        ))
205
206        start = time.time()
207
208        metric = metric_factory.create(
209            metric_config.name,
210            metric_config.params,
211            **kwargs
212        )
213
214        # this can raise a FileNotFoundError, effectively aborting the pipeline
215        eval_sets = self.load_evaluation_sets(
216            eval_set_paths,
217            metric.requires_train_set,
218            metric.requires_test_set
219        )
220
221        eval_sets = self.filter_set_rows(
222            os.path.split(output_path)[0],
223            eval_sets,
224            metric_config.subgroup
225        )
226
227        evaluation = self.compute_metric_evaluation(
228            metric,
229            eval_sets,
230            metric_config
231        )
232
233        add_evaluation_to_file(output_path, evaluation, metric_config)
234
235        end = time.time()
236
237        self.event_dispatcher.dispatch(MetricEventArgs(
238            ON_END_METRIC,
239            metric_config
240        ), elapsed_time=end - start)
241
242    def load_evaluation_sets(
243            self,
244            eval_set_paths: EvaluationSetPaths,
245            train_set_required: bool,
246            test_set_required: bool) -> EvaluationSets:
247        """Load the required evaluation sets.
248
249        Args:
250            eval_set_paths: the file paths of the evaluation sets.
251            train_set_required: whether the train set is required for the evaluation.
252            test_set_required: whether the test set is required for the evaluation.
253
254        Raises:
255            FileNotFoundError: when the file of one of the evaluation sets does not exist.
256
257        Returns:
258            the loaded evaluation sets.
259        """
260        rating_set = self.read_dataframe(
261            eval_set_paths.ratings_path,
262            'rating set',
263            ON_BEGIN_LOAD_RATING_SET,
264            ON_END_LOAD_RATING_SET,
265        )
266
267        train_set = None if not train_set_required else self.read_dataframe(
268            eval_set_paths.train_path,
269            'train set',
270            ON_BEGIN_LOAD_TRAIN_SET,
271            ON_END_LOAD_TRAIN_SET,
272            names=['user', 'item', 'rating']
273        )
274
275        test_set = None if not test_set_required else self.read_dataframe(
276            eval_set_paths.test_path,
277            'test set',
278            ON_BEGIN_LOAD_TEST_SET,
279            ON_END_LOAD_TEST_SET,
280            names=['user', 'item', 'rating']
281        )
282
283        return EvaluationSets(rating_set, train_set, test_set)
284
285    def filter_set_rows(
286            self,
287            output_dir: str,
288            eval_sets: EvaluationSets,
289            subgroup: Optional[DataSubsetConfig]) -> EvaluationSets:
290        """Filter the evaluation set rows for the specified subgroup.
291
292        The subset is created by applying multiple filter passes to the evaluation sets
293        individually. These filter passes are then combined to form the resulting sets.
294
295        Args:
296            eval_sets: the evaluation sets to filter.
297            subgroup: the subgroup to create of the evaluation sets.
298
299        Returns:
300            the filtered evaluation sets.
301        """
302        # Early exit, because no filtering is needed.
303        if subgroup is None or len(subgroup.filter_passes) == 0:
304            return eval_sets
305
306        self.event_dispatcher.dispatch(FilterDataframeEventArgs(
307            ON_BEGIN_FILTER_RECS,
308            subgroup
309        ))
310
311        start = time.time()
312
313        # Filter for each dataframe in eval_sets.
314        filter_factory = self.data_filter_factory
315        if eval_sets.train is not None:
316            eval_sets.train = filter_from_filter_passes(
317                self, output_dir, eval_sets.train, subgroup, filter_factory)
318        if eval_sets.test is not None:
319            eval_sets.test = filter_from_filter_passes(
320                self, output_dir, eval_sets.test, subgroup, filter_factory)
321        eval_sets.ratings = filter_from_filter_passes(
322                self, output_dir, eval_sets.ratings, subgroup, filter_factory)
323        end = time.time()
324        self.event_dispatcher.dispatch(FilterDataframeEventArgs(
325            ON_END_FILTER_RECS,
326            subgroup
327        ), elapsed_time=end - start)
328
329        return eval_sets
330
331    def compute_metric_evaluation(
332            self,
333            metric: BaseMetric,
334            eval_sets: EvaluationSets,
335            metric_config: MetricConfig) -> float:
336        """Compute the evaluation for the specified metric on the specified sets.
337
338        Args:
339            metric: the metric to use for computing the evaluation.
340            eval_sets: the evaluation sets to compute the performance of.
341            metric_config: the metric configuration that is associated with the metric.
342
343        Raises:
344            ArithmeticError: possibly raised by a metric on evaluation.
345            MemoryError: possibly raised by a metric on evaluation.
346            RuntimeError: possibly raised by a metric on evaluation.
347
348        Returns:
349            the computed evaluation of the metric.
350        """
351        self.event_dispatcher.dispatch(MetricEventArgs(
352            ON_BEGIN_EVAL_METRIC,
353            metric_config
354        ))
355
356        start = time.time()
357
358        evaluation = metric.evaluate(eval_sets)
359
360        end = time.time()
361
362        self.event_dispatcher.dispatch(MetricEventArgs(
363            ON_END_EVAL_METRIC,
364            metric_config
365        ), elapsed_time=end - start)
366
367        return evaluation

Evaluation Pipeline to run metric computations for ratings related to a dataset.

The pipeline is intended to be used multiple times on different computed rating files that are all associated to a specific dataset. Loading the evaluation sets is done each time for every metric configuration, so that they can be filtered individually on subgroups before computing the actual evaluation. For each metric it executes the following steps:

1) create the metric. 2) load the evaluation sets. 3) filter the evaluation sets (optional). 4) compute the evaluation of the metric. 5) store evaluations in an overview file.

Public methods:

run

EvaluationPipeline( dataset: src.fairreckitlib.data.set.dataset.Dataset, data_filter_factory: src.fairreckitlib.core.config.config_factories.GroupFactory, metric_category_factory: src.fairreckitlib.core.config.config_factories.GroupFactory, event_dispatcher: src.fairreckitlib.core.events.event_dispatcher.EventDispatcher)
66    def __init__(
67            self,
68            dataset: Dataset,
69            data_filter_factory: GroupFactory,
70            metric_category_factory: GroupFactory,
71            event_dispatcher: EventDispatcher):
72        """Construct the evaluation pipeline.
73
74        Args:
75            dataset: the dataset that is associated with the evaluation of the rating sets.
76            data_filter_factory: the factory with available filters for all dataset-matrix pairs.
77            metric_category_factory: the metric category factory with available metric factories.
78            event_dispatcher: used to dispatch model/IO events when running the pipeline.
79        """
80        CorePipeline.__init__(self, event_dispatcher)
81        self.dataset = dataset
82        self.data_filter_factory = data_filter_factory
83        self.metric_category_factory = metric_category_factory

Construct the evaluation pipeline.

Args: dataset: the dataset that is associated with the evaluation of the rating sets. data_filter_factory: the factory with available filters for all dataset-matrix pairs. metric_category_factory: the metric category factory with available metric factories. event_dispatcher: used to dispatch model/IO events when running the pipeline.

def run( self, output_path: str, eval_set_paths: src.fairreckitlib.evaluation.evaluation_sets.EvaluationSetPaths, metric_config_list: List[src.fairreckitlib.evaluation.pipeline.evaluation_config.MetricConfig], is_running: Callable[[], bool], **kwargs) -> None:
 85    def run(self,
 86            output_path: str,
 87            eval_set_paths: EvaluationSetPaths,
 88            metric_config_list: List[MetricConfig],
 89            is_running: Callable[[], bool],
 90            **kwargs) -> None:
 91        """Run the entire pipeline from beginning to end.
 92
 93        Effectively running all computations of the specified metrics.
 94        All the specified metric configurations that have a subgroup are expected
 95        to be related to the dataset that was used to construct the pipeline.
 96
 97        Args:
 98            output_path: the path of the json file to store the output.
 99            eval_set_paths: the file paths of the evaluation sets.
100            metric_config_list: list of MetricConfig objects to compute.
101            is_running: function that returns whether the pipeline
102                is still running. Stops early when False is returned.
103
104        Keyword Args:
105            reserved for future use
106        """
107        self.event_dispatcher.dispatch(EvaluationPipelineEventArgs(
108            ON_BEGIN_EVAL_PIPELINE,
109            metric_config_list
110        ))
111
112        start = time.time()
113
114        # Create evaluations file
115        create_json(
116            output_path,
117            {'evaluations': []},
118            self.event_dispatcher,
119            indent=4
120        )
121
122        for metric_config in metric_config_list:
123            if not is_running():
124                return
125
126            metric_factory = resolve_factory(
127                metric_config.name,
128                self.metric_category_factory
129            )
130            if metric_factory is None:
131                self.event_dispatcher.dispatch(ErrorEventArgs(
132                    ON_FAILURE_ERROR,
133                    'Failure: to resolve metric factory for metric \'' + metric_config.name + '\''
134                ))
135                continue
136
137            try:
138                self.run_metric(
139                    output_path,
140                    metric_factory,
141                    eval_set_paths,
142                    metric_config,
143                    **kwargs
144                )
145            except ArithmeticError:
146                self.event_dispatcher.dispatch(ErrorEventArgs(
147                    ON_RAISE_ERROR,
148                    'ArithmeticError: trying to run metric ' + metric_config.name
149                ))
150                continue
151            except FileNotFoundError:
152                self.event_dispatcher.dispatch(ErrorEventArgs(
153                    ON_RAISE_ERROR,
154                    'FileNotFoundError: trying to run metric ' + metric_config.name
155                ))
156                continue
157            except MemoryError:
158                self.event_dispatcher.dispatch(ErrorEventArgs(
159                    ON_RAISE_ERROR,
160                    'MemoryError: trying to run metric ' + metric_config.name
161                ))
162                continue
163            except RuntimeError:
164                self.event_dispatcher.dispatch(ErrorEventArgs(
165                    ON_RAISE_ERROR,
166                    'RuntimeError: trying to run metric ' + metric_config.name
167                ))
168                continue
169
170        end = time.time()
171
172        self.event_dispatcher.dispatch(EvaluationPipelineEventArgs(
173            ON_END_EVAL_PIPELINE,
174            metric_config_list
175        ), elapsed_time=end - start)

Run the entire pipeline from beginning to end.

Effectively running all computations of the specified metrics. All the specified metric configurations that have a subgroup are expected to be related to the dataset that was used to construct the pipeline.

Args: output_path: the path of the json file to store the output. eval_set_paths: the file paths of the evaluation sets. metric_config_list: list of MetricConfig objects to compute. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.

Keyword Args: reserved for future use

def run_metric( self, output_path: str, metric_factory: src.fairreckitlib.core.config.config_factories.Factory, eval_set_paths: src.fairreckitlib.evaluation.evaluation_sets.EvaluationSetPaths, metric_config: src.fairreckitlib.evaluation.pipeline.evaluation_config.MetricConfig, **kwargs) -> None:
177    def run_metric(
178            self,
179            output_path: str,
180            metric_factory: Factory,
181            eval_set_paths: EvaluationSetPaths,
182            metric_config: MetricConfig,
183            **kwargs) -> None:
184        """Run the evaluation computation for the specified metric configuration.
185
186        Args:
187            output_path: the path of the json file to store the output.
188            metric_factory: the factory that contains the specified metric.
189            eval_set_paths: the file paths of the evaluation sets.
190            metric_config: the metric evaluation configuration.
191
192        Raises:
193            ArithmeticError: possibly raised by a metric on construction or evaluation.
194            MemoryError: possibly raised by a metric on construction or evaluation.
195            RuntimeError: possibly raised by a metric on construction or evaluation.
196            FileNotFoundError: when the file of one of the evaluation sets does not exist.
197
198        Keyword Args:
199            reserved for future use
200        """
201        self.event_dispatcher.dispatch(MetricEventArgs(
202            ON_BEGIN_METRIC,
203            metric_config
204        ))
205
206        start = time.time()
207
208        metric = metric_factory.create(
209            metric_config.name,
210            metric_config.params,
211            **kwargs
212        )
213
214        # this can raise a FileNotFoundError, effectively aborting the pipeline
215        eval_sets = self.load_evaluation_sets(
216            eval_set_paths,
217            metric.requires_train_set,
218            metric.requires_test_set
219        )
220
221        eval_sets = self.filter_set_rows(
222            os.path.split(output_path)[0],
223            eval_sets,
224            metric_config.subgroup
225        )
226
227        evaluation = self.compute_metric_evaluation(
228            metric,
229            eval_sets,
230            metric_config
231        )
232
233        add_evaluation_to_file(output_path, evaluation, metric_config)
234
235        end = time.time()
236
237        self.event_dispatcher.dispatch(MetricEventArgs(
238            ON_END_METRIC,
239            metric_config
240        ), elapsed_time=end - start)

Run the evaluation computation for the specified metric configuration.

Args: output_path: the path of the json file to store the output. metric_factory: the factory that contains the specified metric. eval_set_paths: the file paths of the evaluation sets. metric_config: the metric evaluation configuration.

Raises: ArithmeticError: possibly raised by a metric on construction or evaluation. MemoryError: possibly raised by a metric on construction or evaluation. RuntimeError: possibly raised by a metric on construction or evaluation. FileNotFoundError: when the file of one of the evaluation sets does not exist.

Keyword Args: reserved for future use

def load_evaluation_sets( self, eval_set_paths: src.fairreckitlib.evaluation.evaluation_sets.EvaluationSetPaths, train_set_required: bool, test_set_required: bool) -> src.fairreckitlib.evaluation.evaluation_sets.EvaluationSets:
242    def load_evaluation_sets(
243            self,
244            eval_set_paths: EvaluationSetPaths,
245            train_set_required: bool,
246            test_set_required: bool) -> EvaluationSets:
247        """Load the required evaluation sets.
248
249        Args:
250            eval_set_paths: the file paths of the evaluation sets.
251            train_set_required: whether the train set is required for the evaluation.
252            test_set_required: whether the test set is required for the evaluation.
253
254        Raises:
255            FileNotFoundError: when the file of one of the evaluation sets does not exist.
256
257        Returns:
258            the loaded evaluation sets.
259        """
260        rating_set = self.read_dataframe(
261            eval_set_paths.ratings_path,
262            'rating set',
263            ON_BEGIN_LOAD_RATING_SET,
264            ON_END_LOAD_RATING_SET,
265        )
266
267        train_set = None if not train_set_required else self.read_dataframe(
268            eval_set_paths.train_path,
269            'train set',
270            ON_BEGIN_LOAD_TRAIN_SET,
271            ON_END_LOAD_TRAIN_SET,
272            names=['user', 'item', 'rating']
273        )
274
275        test_set = None if not test_set_required else self.read_dataframe(
276            eval_set_paths.test_path,
277            'test set',
278            ON_BEGIN_LOAD_TEST_SET,
279            ON_END_LOAD_TEST_SET,
280            names=['user', 'item', 'rating']
281        )
282
283        return EvaluationSets(rating_set, train_set, test_set)

Load the required evaluation sets.

Args: eval_set_paths: the file paths of the evaluation sets. train_set_required: whether the train set is required for the evaluation. test_set_required: whether the test set is required for the evaluation.

Raises: FileNotFoundError: when the file of one of the evaluation sets does not exist.

Returns: the loaded evaluation sets.

def filter_set_rows( self, output_dir: str, eval_sets: src.fairreckitlib.evaluation.evaluation_sets.EvaluationSets, subgroup: Optional[src.fairreckitlib.data.filter.filter_config.DataSubsetConfig]) -> src.fairreckitlib.evaluation.evaluation_sets.EvaluationSets:
285    def filter_set_rows(
286            self,
287            output_dir: str,
288            eval_sets: EvaluationSets,
289            subgroup: Optional[DataSubsetConfig]) -> EvaluationSets:
290        """Filter the evaluation set rows for the specified subgroup.
291
292        The subset is created by applying multiple filter passes to the evaluation sets
293        individually. These filter passes are then combined to form the resulting sets.
294
295        Args:
296            eval_sets: the evaluation sets to filter.
297            subgroup: the subgroup to create of the evaluation sets.
298
299        Returns:
300            the filtered evaluation sets.
301        """
302        # Early exit, because no filtering is needed.
303        if subgroup is None or len(subgroup.filter_passes) == 0:
304            return eval_sets
305
306        self.event_dispatcher.dispatch(FilterDataframeEventArgs(
307            ON_BEGIN_FILTER_RECS,
308            subgroup
309        ))
310
311        start = time.time()
312
313        # Filter for each dataframe in eval_sets.
314        filter_factory = self.data_filter_factory
315        if eval_sets.train is not None:
316            eval_sets.train = filter_from_filter_passes(
317                self, output_dir, eval_sets.train, subgroup, filter_factory)
318        if eval_sets.test is not None:
319            eval_sets.test = filter_from_filter_passes(
320                self, output_dir, eval_sets.test, subgroup, filter_factory)
321        eval_sets.ratings = filter_from_filter_passes(
322                self, output_dir, eval_sets.ratings, subgroup, filter_factory)
323        end = time.time()
324        self.event_dispatcher.dispatch(FilterDataframeEventArgs(
325            ON_END_FILTER_RECS,
326            subgroup
327        ), elapsed_time=end - start)
328
329        return eval_sets

Filter the evaluation set rows for the specified subgroup.

The subset is created by applying multiple filter passes to the evaluation sets individually. These filter passes are then combined to form the resulting sets.

Args: eval_sets: the evaluation sets to filter. subgroup: the subgroup to create of the evaluation sets.

Returns: the filtered evaluation sets.

def compute_metric_evaluation( self, metric: src.fairreckitlib.evaluation.metrics.metric_base.BaseMetric, eval_sets: src.fairreckitlib.evaluation.evaluation_sets.EvaluationSets, metric_config: src.fairreckitlib.evaluation.pipeline.evaluation_config.MetricConfig) -> float:
331    def compute_metric_evaluation(
332            self,
333            metric: BaseMetric,
334            eval_sets: EvaluationSets,
335            metric_config: MetricConfig) -> float:
336        """Compute the evaluation for the specified metric on the specified sets.
337
338        Args:
339            metric: the metric to use for computing the evaluation.
340            eval_sets: the evaluation sets to compute the performance of.
341            metric_config: the metric configuration that is associated with the metric.
342
343        Raises:
344            ArithmeticError: possibly raised by a metric on evaluation.
345            MemoryError: possibly raised by a metric on evaluation.
346            RuntimeError: possibly raised by a metric on evaluation.
347
348        Returns:
349            the computed evaluation of the metric.
350        """
351        self.event_dispatcher.dispatch(MetricEventArgs(
352            ON_BEGIN_EVAL_METRIC,
353            metric_config
354        ))
355
356        start = time.time()
357
358        evaluation = metric.evaluate(eval_sets)
359
360        end = time.time()
361
362        self.event_dispatcher.dispatch(MetricEventArgs(
363            ON_END_EVAL_METRIC,
364            metric_config
365        ), elapsed_time=end - start)
366
367        return evaluation

Compute the evaluation for the specified metric on the specified sets.

Args: metric: the metric to use for computing the evaluation. eval_sets: the evaluation sets to compute the performance of. metric_config: the metric configuration that is associated with the metric.

Raises: ArithmeticError: possibly raised by a metric on evaluation. MemoryError: possibly raised by a metric on evaluation. RuntimeError: possibly raised by a metric on evaluation.

Returns: the computed evaluation of the metric.

def add_evaluation_to_file( file_path: str, evaluation_value: float, metric_config: src.fairreckitlib.evaluation.pipeline.evaluation_config.MetricConfig) -> None:
370def add_evaluation_to_file(
371        file_path: str,
372        evaluation_value: float,
373        metric_config: MetricConfig) -> None:
374    """Add an evaluation result to the list in the overview file.
375
376    Args:
377        file_path: the path to the evaluations overview file.
378        evaluation_value: the evaluation result.
379        metric_config: the metric configuration used for the evaluation.
380    """
381    subgroup = {} if metric_config.subgroup is None else metric_config.subgroup.to_yml_format()
382    evaluation = {KEY_NAME: metric_config.name,
383                  KEY_PARAMS: metric_config.params,
384                  KEY_METRIC_EVALUATION:{'value': evaluation_value, KEY_METRIC_SUBGROUP: subgroup}}
385
386    evaluations = load_json(file_path)
387    evaluations['evaluations'].append(evaluation)
388    save_json(file_path, evaluations, indent=4)

Add an evaluation result to the list in the overview file.

Args: file_path: the path to the evaluations overview file. evaluation_value: the evaluation result. metric_config: the metric configuration used for the evaluation.