src.fairreckitlib.model.pipeline.recommendation_pipeline_elliot

This module contains a model pipeline that recommends items using the Elliot framework.

Classes:

RecommendationPipelineElliot: can batch recommendations from multiple elliot models.

Deprecated:

from elliot.run import run_experiment

class RecommendationPipelineElliot(RecommendationPipeline): ... def train_and_test_model( self, model: ElliotRecommender, model_dir: str, is_running: Callable[[], bool], **kwargs) -> str: ... create_yml(yml_path, data, self.event_dispatcher)

    run_experiment(yml_path)

    delete_dir(temp_dir, self.event_dispatcher)
    ...
...

This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)

  1"""This module contains a model pipeline that recommends items using the Elliot framework.
  2
  3Classes:
  4
  5    RecommendationPipelineElliot: can batch recommendations from multiple elliot models.
  6
  7Deprecated:
  8
  9from elliot.run import run_experiment
 10
 11class RecommendationPipelineElliot(RecommendationPipeline):
 12    ...
 13    def train_and_test_model(
 14            self,
 15            model: ElliotRecommender,
 16            model_dir: str,
 17            is_running: Callable[[], bool],
 18            **kwargs) -> str:
 19        ...
 20        create_yml(yml_path, data, self.event_dispatcher)
 21
 22        run_experiment(yml_path)
 23
 24        delete_dir(temp_dir, self.event_dispatcher)
 25        ...
 26    ...
 27
 28This program has been developed by students from the bachelor Computer Science at
 29Utrecht University within the Software Project course.
 30© Copyright Utrecht University (Department of Information and Computing Sciences)
 31"""
 32
 33import os
 34from typing import Callable
 35
 36import numpy as np
 37import pandas as pd
 38
 39from ...core.core_constants import MODEL_RATINGS_FILE
 40from ...core.io.io_create import create_dir, create_yml
 41from ...core.io.io_delete import delete_dir, delete_file
 42from ..algorithms.elliot.elliot_recommender import ElliotRecommender
 43from .recommendation_pipeline import RecommendationPipeline
 44
 45
 46class RecommendationPipelineElliot(RecommendationPipeline):
 47    """Recommendation Pipeline implementation for the Elliot framework."""
 48
 49    def train_and_test_model(
 50            self,
 51            model: ElliotRecommender,
 52            model_dir: str,
 53            is_running: Callable[[], bool],
 54            **kwargs) -> None:
 55        """Train and test the specified model.
 56
 57        Convert the model configuration into a yml file that is accepted by the framework.
 58        Feed it to the framework to obtain results, clear unwanted artifacts and modify the
 59        ratings file so that it conforms to the standard convention.
 60
 61        Args:
 62            model: the model that needs to be trained.
 63            model_dir: the path of the directory where the computed ratings can be stored.
 64            is_running: function that returns whether the pipeline
 65                is still running. Stops early when False is returned.
 66
 67        Keyword Args:
 68            num_items(int): the number of item recommendations to produce.
 69
 70        Raises:
 71            ArithmeticError: possibly raised by a model on training or testing.
 72            MemoryError: possibly raised by a model on training or testing.
 73            RuntimeError: possibly raised by a model on training or testing.
 74        """
 75        params = model.get_params()
 76        params['meta'] = {'verbose': False, 'save_recs': True, 'save_weights': False}
 77
 78        top_k = kwargs['num_items']
 79
 80        temp_dir = create_dir(os.path.join(model_dir, 'temp'), self.event_dispatcher)
 81        yml_path = os.path.join(temp_dir, 'config.yml')
 82
 83        data = {
 84            'experiment': {
 85                'dataset': 'df',
 86                'data_config': {
 87                    'strategy': 'fixed',
 88                    'train_path': os.path.join('..', '..', 'train_set.tsv'),
 89                    'test_path': os.path.join('..', '..', 'test_set.tsv'),
 90                },
 91                'top_k': top_k,
 92                'models': {
 93                    model.get_name(): params
 94                },
 95                'evaluation': {
 96                    'simple_metrics': ['Precision']
 97                },
 98                'path_output_rec_result': model_dir,
 99                'path_output_rec_weight': temp_dir,
100                'path_output_rec_performance': temp_dir
101            }
102        }
103
104        create_yml(yml_path, data, self.event_dispatcher)
105
106        # run_experiment(yml_path)
107
108        delete_dir(temp_dir, self.event_dispatcher)
109        if params.get('epochs'):
110            # remove everything so that only the final epochs file remains
111            self.clear_unused_epochs(params['epochs'], model_dir)
112
113        self.reconstruct_rank_column(model_dir, top_k)
114
115    def clear_unused_epochs(self, num_epochs: int, model_dir: str) -> None:
116        """Clear unused epochs from the model output directory.
117
118        Recommenders with an 'epochs' parameter will generate computed ratings
119        for each epoch. Only the final epoch is needed.
120
121        Args:
122            num_epochs: the number of epochs that was run by the algorithm.
123            model_dir: the directory where the computed ratings are stored.
124        """
125        used_epoch = 'it=' + str(num_epochs)
126        for file in os.listdir(model_dir):
127            file_name = os.fsdecode(file)
128            # skip model settings json
129            if 'settings.json' in file_name:
130                continue
131
132            file_path = os.path.join(model_dir, file_name)
133
134            if used_epoch not in file_name:
135                delete_file(file_path, self.event_dispatcher)
136
137    def reconstruct_rank_column(self, model_dir: str, top_k: int) -> None:
138        """Reconstruct the rank column in the result file that the framework generated.
139
140        Args:
141            model_dir: the directory where the computed ratings are stored.
142            top_k: the topK that was used to compute the ratings.
143        """
144        result_file_path = self.rename_result(model_dir)
145        result = pd.read_csv(
146            result_file_path,
147            sep='\t',
148            header=None,
149            names=['user', 'item', 'score']
150        )
151
152        # create topK ranking array
153        row_count = len(result)
154        ranks = np.zeros(row_count)
155        for i in range(row_count):
156            ranks[i] = i % top_k + 1
157
158        # add rank column
159        result['rank'] = ranks
160        result['rank'] = result['rank'].astype(int)
161
162        # overwrite result
163        result[['rank', 'user', 'item', 'score']].to_csv(
164            result_file_path,
165            sep='\t',
166            header=True,
167            index=False
168        )
169
170    @staticmethod
171    def rename_result(model_dir: str) -> str:
172        """Rename the computed ratings file to be consistent with other pipelines.
173
174        Args:
175            model_dir: the directory where the computed ratings are stored.
176
177        Returns:
178            the file path of the result after renaming.
179        """
180        for file in os.listdir(model_dir):
181            file_name = os.fsdecode(file)
182            # skip the model settings json
183            if '.tsv' not in file_name:
184                continue
185
186            src_path = os.path.join(model_dir, file_name)
187            dst_path = os.path.join(model_dir, MODEL_RATINGS_FILE)
188
189            os.rename(src_path, dst_path)
190
191            return dst_path
 47class RecommendationPipelineElliot(RecommendationPipeline):
 48    """Recommendation Pipeline implementation for the Elliot framework."""
 49
 50    def train_and_test_model(
 51            self,
 52            model: ElliotRecommender,
 53            model_dir: str,
 54            is_running: Callable[[], bool],
 55            **kwargs) -> None:
 56        """Train and test the specified model.
 57
 58        Convert the model configuration into a yml file that is accepted by the framework.
 59        Feed it to the framework to obtain results, clear unwanted artifacts and modify the
 60        ratings file so that it conforms to the standard convention.
 61
 62        Args:
 63            model: the model that needs to be trained.
 64            model_dir: the path of the directory where the computed ratings can be stored.
 65            is_running: function that returns whether the pipeline
 66                is still running. Stops early when False is returned.
 67
 68        Keyword Args:
 69            num_items(int): the number of item recommendations to produce.
 70
 71        Raises:
 72            ArithmeticError: possibly raised by a model on training or testing.
 73            MemoryError: possibly raised by a model on training or testing.
 74            RuntimeError: possibly raised by a model on training or testing.
 75        """
 76        params = model.get_params()
 77        params['meta'] = {'verbose': False, 'save_recs': True, 'save_weights': False}
 78
 79        top_k = kwargs['num_items']
 80
 81        temp_dir = create_dir(os.path.join(model_dir, 'temp'), self.event_dispatcher)
 82        yml_path = os.path.join(temp_dir, 'config.yml')
 83
 84        data = {
 85            'experiment': {
 86                'dataset': 'df',
 87                'data_config': {
 88                    'strategy': 'fixed',
 89                    'train_path': os.path.join('..', '..', 'train_set.tsv'),
 90                    'test_path': os.path.join('..', '..', 'test_set.tsv'),
 91                },
 92                'top_k': top_k,
 93                'models': {
 94                    model.get_name(): params
 95                },
 96                'evaluation': {
 97                    'simple_metrics': ['Precision']
 98                },
 99                'path_output_rec_result': model_dir,
100                'path_output_rec_weight': temp_dir,
101                'path_output_rec_performance': temp_dir
102            }
103        }
104
105        create_yml(yml_path, data, self.event_dispatcher)
106
107        # run_experiment(yml_path)
108
109        delete_dir(temp_dir, self.event_dispatcher)
110        if params.get('epochs'):
111            # remove everything so that only the final epochs file remains
112            self.clear_unused_epochs(params['epochs'], model_dir)
113
114        self.reconstruct_rank_column(model_dir, top_k)
115
116    def clear_unused_epochs(self, num_epochs: int, model_dir: str) -> None:
117        """Clear unused epochs from the model output directory.
118
119        Recommenders with an 'epochs' parameter will generate computed ratings
120        for each epoch. Only the final epoch is needed.
121
122        Args:
123            num_epochs: the number of epochs that was run by the algorithm.
124            model_dir: the directory where the computed ratings are stored.
125        """
126        used_epoch = 'it=' + str(num_epochs)
127        for file in os.listdir(model_dir):
128            file_name = os.fsdecode(file)
129            # skip model settings json
130            if 'settings.json' in file_name:
131                continue
132
133            file_path = os.path.join(model_dir, file_name)
134
135            if used_epoch not in file_name:
136                delete_file(file_path, self.event_dispatcher)
137
138    def reconstruct_rank_column(self, model_dir: str, top_k: int) -> None:
139        """Reconstruct the rank column in the result file that the framework generated.
140
141        Args:
142            model_dir: the directory where the computed ratings are stored.
143            top_k: the topK that was used to compute the ratings.
144        """
145        result_file_path = self.rename_result(model_dir)
146        result = pd.read_csv(
147            result_file_path,
148            sep='\t',
149            header=None,
150            names=['user', 'item', 'score']
151        )
152
153        # create topK ranking array
154        row_count = len(result)
155        ranks = np.zeros(row_count)
156        for i in range(row_count):
157            ranks[i] = i % top_k + 1
158
159        # add rank column
160        result['rank'] = ranks
161        result['rank'] = result['rank'].astype(int)
162
163        # overwrite result
164        result[['rank', 'user', 'item', 'score']].to_csv(
165            result_file_path,
166            sep='\t',
167            header=True,
168            index=False
169        )
170
171    @staticmethod
172    def rename_result(model_dir: str) -> str:
173        """Rename the computed ratings file to be consistent with other pipelines.
174
175        Args:
176            model_dir: the directory where the computed ratings are stored.
177
178        Returns:
179            the file path of the result after renaming.
180        """
181        for file in os.listdir(model_dir):
182            file_name = os.fsdecode(file)
183            # skip the model settings json
184            if '.tsv' not in file_name:
185                continue
186
187            src_path = os.path.join(model_dir, file_name)
188            dst_path = os.path.join(model_dir, MODEL_RATINGS_FILE)
189
190            os.rename(src_path, dst_path)
191
192            return dst_path

Recommendation Pipeline implementation for the Elliot framework.

def train_and_test_model( self, model: src.fairreckitlib.model.algorithms.elliot.elliot_recommender.ElliotRecommender, model_dir: str, is_running: Callable[[], bool], **kwargs) -> None:
 50    def train_and_test_model(
 51            self,
 52            model: ElliotRecommender,
 53            model_dir: str,
 54            is_running: Callable[[], bool],
 55            **kwargs) -> None:
 56        """Train and test the specified model.
 57
 58        Convert the model configuration into a yml file that is accepted by the framework.
 59        Feed it to the framework to obtain results, clear unwanted artifacts and modify the
 60        ratings file so that it conforms to the standard convention.
 61
 62        Args:
 63            model: the model that needs to be trained.
 64            model_dir: the path of the directory where the computed ratings can be stored.
 65            is_running: function that returns whether the pipeline
 66                is still running. Stops early when False is returned.
 67
 68        Keyword Args:
 69            num_items(int): the number of item recommendations to produce.
 70
 71        Raises:
 72            ArithmeticError: possibly raised by a model on training or testing.
 73            MemoryError: possibly raised by a model on training or testing.
 74            RuntimeError: possibly raised by a model on training or testing.
 75        """
 76        params = model.get_params()
 77        params['meta'] = {'verbose': False, 'save_recs': True, 'save_weights': False}
 78
 79        top_k = kwargs['num_items']
 80
 81        temp_dir = create_dir(os.path.join(model_dir, 'temp'), self.event_dispatcher)
 82        yml_path = os.path.join(temp_dir, 'config.yml')
 83
 84        data = {
 85            'experiment': {
 86                'dataset': 'df',
 87                'data_config': {
 88                    'strategy': 'fixed',
 89                    'train_path': os.path.join('..', '..', 'train_set.tsv'),
 90                    'test_path': os.path.join('..', '..', 'test_set.tsv'),
 91                },
 92                'top_k': top_k,
 93                'models': {
 94                    model.get_name(): params
 95                },
 96                'evaluation': {
 97                    'simple_metrics': ['Precision']
 98                },
 99                'path_output_rec_result': model_dir,
100                'path_output_rec_weight': temp_dir,
101                'path_output_rec_performance': temp_dir
102            }
103        }
104
105        create_yml(yml_path, data, self.event_dispatcher)
106
107        # run_experiment(yml_path)
108
109        delete_dir(temp_dir, self.event_dispatcher)
110        if params.get('epochs'):
111            # remove everything so that only the final epochs file remains
112            self.clear_unused_epochs(params['epochs'], model_dir)
113
114        self.reconstruct_rank_column(model_dir, top_k)

Train and test the specified model.

Convert the model configuration into a yml file that is accepted by the framework. Feed it to the framework to obtain results, clear unwanted artifacts and modify the ratings file so that it conforms to the standard convention.

Args: model: the model that needs to be trained. model_dir: the path of the directory where the computed ratings can be stored. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.

Keyword Args: num_items(int): the number of item recommendations to produce.

Raises: ArithmeticError: possibly raised by a model on training or testing. MemoryError: possibly raised by a model on training or testing. RuntimeError: possibly raised by a model on training or testing.

def clear_unused_epochs(self, num_epochs: int, model_dir: str) -> None:
116    def clear_unused_epochs(self, num_epochs: int, model_dir: str) -> None:
117        """Clear unused epochs from the model output directory.
118
119        Recommenders with an 'epochs' parameter will generate computed ratings
120        for each epoch. Only the final epoch is needed.
121
122        Args:
123            num_epochs: the number of epochs that was run by the algorithm.
124            model_dir: the directory where the computed ratings are stored.
125        """
126        used_epoch = 'it=' + str(num_epochs)
127        for file in os.listdir(model_dir):
128            file_name = os.fsdecode(file)
129            # skip model settings json
130            if 'settings.json' in file_name:
131                continue
132
133            file_path = os.path.join(model_dir, file_name)
134
135            if used_epoch not in file_name:
136                delete_file(file_path, self.event_dispatcher)

Clear unused epochs from the model output directory.

Recommenders with an 'epochs' parameter will generate computed ratings for each epoch. Only the final epoch is needed.

Args: num_epochs: the number of epochs that was run by the algorithm. model_dir: the directory where the computed ratings are stored.

def reconstruct_rank_column(self, model_dir: str, top_k: int) -> None:
138    def reconstruct_rank_column(self, model_dir: str, top_k: int) -> None:
139        """Reconstruct the rank column in the result file that the framework generated.
140
141        Args:
142            model_dir: the directory where the computed ratings are stored.
143            top_k: the topK that was used to compute the ratings.
144        """
145        result_file_path = self.rename_result(model_dir)
146        result = pd.read_csv(
147            result_file_path,
148            sep='\t',
149            header=None,
150            names=['user', 'item', 'score']
151        )
152
153        # create topK ranking array
154        row_count = len(result)
155        ranks = np.zeros(row_count)
156        for i in range(row_count):
157            ranks[i] = i % top_k + 1
158
159        # add rank column
160        result['rank'] = ranks
161        result['rank'] = result['rank'].astype(int)
162
163        # overwrite result
164        result[['rank', 'user', 'item', 'score']].to_csv(
165            result_file_path,
166            sep='\t',
167            header=True,
168            index=False
169        )

Reconstruct the rank column in the result file that the framework generated.

Args: model_dir: the directory where the computed ratings are stored. top_k: the topK that was used to compute the ratings.

@staticmethod
def rename_result(model_dir: str) -> str:
171    @staticmethod
172    def rename_result(model_dir: str) -> str:
173        """Rename the computed ratings file to be consistent with other pipelines.
174
175        Args:
176            model_dir: the directory where the computed ratings are stored.
177
178        Returns:
179            the file path of the result after renaming.
180        """
181        for file in os.listdir(model_dir):
182            file_name = os.fsdecode(file)
183            # skip the model settings json
184            if '.tsv' not in file_name:
185                continue
186
187            src_path = os.path.join(model_dir, file_name)
188            dst_path = os.path.join(model_dir, MODEL_RATINGS_FILE)
189
190            os.rename(src_path, dst_path)
191
192            return dst_path

Rename the computed ratings file to be consistent with other pipelines.

Args: model_dir: the directory where the computed ratings are stored.

Returns: the file path of the result after renaming.