src.fairreckitlib.model.pipeline.recommendation_pipeline_elliot
This module contains a model pipeline that recommends items using the Elliot framework.
Classes:
RecommendationPipelineElliot: can batch recommendations from multiple elliot models.
Deprecated:
from elliot.run import run_experiment
class RecommendationPipelineElliot(RecommendationPipeline): ... def train_and_test_model( self, model: ElliotRecommender, model_dir: str, is_running: Callable[[], bool], **kwargs) -> str: ... create_yml(yml_path, data, self.event_dispatcher)
run_experiment(yml_path)
delete_dir(temp_dir, self.event_dispatcher)
...
...
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)
1"""This module contains a model pipeline that recommends items using the Elliot framework. 2 3Classes: 4 5 RecommendationPipelineElliot: can batch recommendations from multiple elliot models. 6 7Deprecated: 8 9from elliot.run import run_experiment 10 11class RecommendationPipelineElliot(RecommendationPipeline): 12 ... 13 def train_and_test_model( 14 self, 15 model: ElliotRecommender, 16 model_dir: str, 17 is_running: Callable[[], bool], 18 **kwargs) -> str: 19 ... 20 create_yml(yml_path, data, self.event_dispatcher) 21 22 run_experiment(yml_path) 23 24 delete_dir(temp_dir, self.event_dispatcher) 25 ... 26 ... 27 28This program has been developed by students from the bachelor Computer Science at 29Utrecht University within the Software Project course. 30© Copyright Utrecht University (Department of Information and Computing Sciences) 31""" 32 33import os 34from typing import Callable 35 36import numpy as np 37import pandas as pd 38 39from ...core.core_constants import MODEL_RATINGS_FILE 40from ...core.io.io_create import create_dir, create_yml 41from ...core.io.io_delete import delete_dir, delete_file 42from ..algorithms.elliot.elliot_recommender import ElliotRecommender 43from .recommendation_pipeline import RecommendationPipeline 44 45 46class RecommendationPipelineElliot(RecommendationPipeline): 47 """Recommendation Pipeline implementation for the Elliot framework.""" 48 49 def train_and_test_model( 50 self, 51 model: ElliotRecommender, 52 model_dir: str, 53 is_running: Callable[[], bool], 54 **kwargs) -> None: 55 """Train and test the specified model. 56 57 Convert the model configuration into a yml file that is accepted by the framework. 58 Feed it to the framework to obtain results, clear unwanted artifacts and modify the 59 ratings file so that it conforms to the standard convention. 60 61 Args: 62 model: the model that needs to be trained. 63 model_dir: the path of the directory where the computed ratings can be stored. 64 is_running: function that returns whether the pipeline 65 is still running. Stops early when False is returned. 66 67 Keyword Args: 68 num_items(int): the number of item recommendations to produce. 69 70 Raises: 71 ArithmeticError: possibly raised by a model on training or testing. 72 MemoryError: possibly raised by a model on training or testing. 73 RuntimeError: possibly raised by a model on training or testing. 74 """ 75 params = model.get_params() 76 params['meta'] = {'verbose': False, 'save_recs': True, 'save_weights': False} 77 78 top_k = kwargs['num_items'] 79 80 temp_dir = create_dir(os.path.join(model_dir, 'temp'), self.event_dispatcher) 81 yml_path = os.path.join(temp_dir, 'config.yml') 82 83 data = { 84 'experiment': { 85 'dataset': 'df', 86 'data_config': { 87 'strategy': 'fixed', 88 'train_path': os.path.join('..', '..', 'train_set.tsv'), 89 'test_path': os.path.join('..', '..', 'test_set.tsv'), 90 }, 91 'top_k': top_k, 92 'models': { 93 model.get_name(): params 94 }, 95 'evaluation': { 96 'simple_metrics': ['Precision'] 97 }, 98 'path_output_rec_result': model_dir, 99 'path_output_rec_weight': temp_dir, 100 'path_output_rec_performance': temp_dir 101 } 102 } 103 104 create_yml(yml_path, data, self.event_dispatcher) 105 106 # run_experiment(yml_path) 107 108 delete_dir(temp_dir, self.event_dispatcher) 109 if params.get('epochs'): 110 # remove everything so that only the final epochs file remains 111 self.clear_unused_epochs(params['epochs'], model_dir) 112 113 self.reconstruct_rank_column(model_dir, top_k) 114 115 def clear_unused_epochs(self, num_epochs: int, model_dir: str) -> None: 116 """Clear unused epochs from the model output directory. 117 118 Recommenders with an 'epochs' parameter will generate computed ratings 119 for each epoch. Only the final epoch is needed. 120 121 Args: 122 num_epochs: the number of epochs that was run by the algorithm. 123 model_dir: the directory where the computed ratings are stored. 124 """ 125 used_epoch = 'it=' + str(num_epochs) 126 for file in os.listdir(model_dir): 127 file_name = os.fsdecode(file) 128 # skip model settings json 129 if 'settings.json' in file_name: 130 continue 131 132 file_path = os.path.join(model_dir, file_name) 133 134 if used_epoch not in file_name: 135 delete_file(file_path, self.event_dispatcher) 136 137 def reconstruct_rank_column(self, model_dir: str, top_k: int) -> None: 138 """Reconstruct the rank column in the result file that the framework generated. 139 140 Args: 141 model_dir: the directory where the computed ratings are stored. 142 top_k: the topK that was used to compute the ratings. 143 """ 144 result_file_path = self.rename_result(model_dir) 145 result = pd.read_csv( 146 result_file_path, 147 sep='\t', 148 header=None, 149 names=['user', 'item', 'score'] 150 ) 151 152 # create topK ranking array 153 row_count = len(result) 154 ranks = np.zeros(row_count) 155 for i in range(row_count): 156 ranks[i] = i % top_k + 1 157 158 # add rank column 159 result['rank'] = ranks 160 result['rank'] = result['rank'].astype(int) 161 162 # overwrite result 163 result[['rank', 'user', 'item', 'score']].to_csv( 164 result_file_path, 165 sep='\t', 166 header=True, 167 index=False 168 ) 169 170 @staticmethod 171 def rename_result(model_dir: str) -> str: 172 """Rename the computed ratings file to be consistent with other pipelines. 173 174 Args: 175 model_dir: the directory where the computed ratings are stored. 176 177 Returns: 178 the file path of the result after renaming. 179 """ 180 for file in os.listdir(model_dir): 181 file_name = os.fsdecode(file) 182 # skip the model settings json 183 if '.tsv' not in file_name: 184 continue 185 186 src_path = os.path.join(model_dir, file_name) 187 dst_path = os.path.join(model_dir, MODEL_RATINGS_FILE) 188 189 os.rename(src_path, dst_path) 190 191 return dst_path
47class RecommendationPipelineElliot(RecommendationPipeline): 48 """Recommendation Pipeline implementation for the Elliot framework.""" 49 50 def train_and_test_model( 51 self, 52 model: ElliotRecommender, 53 model_dir: str, 54 is_running: Callable[[], bool], 55 **kwargs) -> None: 56 """Train and test the specified model. 57 58 Convert the model configuration into a yml file that is accepted by the framework. 59 Feed it to the framework to obtain results, clear unwanted artifacts and modify the 60 ratings file so that it conforms to the standard convention. 61 62 Args: 63 model: the model that needs to be trained. 64 model_dir: the path of the directory where the computed ratings can be stored. 65 is_running: function that returns whether the pipeline 66 is still running. Stops early when False is returned. 67 68 Keyword Args: 69 num_items(int): the number of item recommendations to produce. 70 71 Raises: 72 ArithmeticError: possibly raised by a model on training or testing. 73 MemoryError: possibly raised by a model on training or testing. 74 RuntimeError: possibly raised by a model on training or testing. 75 """ 76 params = model.get_params() 77 params['meta'] = {'verbose': False, 'save_recs': True, 'save_weights': False} 78 79 top_k = kwargs['num_items'] 80 81 temp_dir = create_dir(os.path.join(model_dir, 'temp'), self.event_dispatcher) 82 yml_path = os.path.join(temp_dir, 'config.yml') 83 84 data = { 85 'experiment': { 86 'dataset': 'df', 87 'data_config': { 88 'strategy': 'fixed', 89 'train_path': os.path.join('..', '..', 'train_set.tsv'), 90 'test_path': os.path.join('..', '..', 'test_set.tsv'), 91 }, 92 'top_k': top_k, 93 'models': { 94 model.get_name(): params 95 }, 96 'evaluation': { 97 'simple_metrics': ['Precision'] 98 }, 99 'path_output_rec_result': model_dir, 100 'path_output_rec_weight': temp_dir, 101 'path_output_rec_performance': temp_dir 102 } 103 } 104 105 create_yml(yml_path, data, self.event_dispatcher) 106 107 # run_experiment(yml_path) 108 109 delete_dir(temp_dir, self.event_dispatcher) 110 if params.get('epochs'): 111 # remove everything so that only the final epochs file remains 112 self.clear_unused_epochs(params['epochs'], model_dir) 113 114 self.reconstruct_rank_column(model_dir, top_k) 115 116 def clear_unused_epochs(self, num_epochs: int, model_dir: str) -> None: 117 """Clear unused epochs from the model output directory. 118 119 Recommenders with an 'epochs' parameter will generate computed ratings 120 for each epoch. Only the final epoch is needed. 121 122 Args: 123 num_epochs: the number of epochs that was run by the algorithm. 124 model_dir: the directory where the computed ratings are stored. 125 """ 126 used_epoch = 'it=' + str(num_epochs) 127 for file in os.listdir(model_dir): 128 file_name = os.fsdecode(file) 129 # skip model settings json 130 if 'settings.json' in file_name: 131 continue 132 133 file_path = os.path.join(model_dir, file_name) 134 135 if used_epoch not in file_name: 136 delete_file(file_path, self.event_dispatcher) 137 138 def reconstruct_rank_column(self, model_dir: str, top_k: int) -> None: 139 """Reconstruct the rank column in the result file that the framework generated. 140 141 Args: 142 model_dir: the directory where the computed ratings are stored. 143 top_k: the topK that was used to compute the ratings. 144 """ 145 result_file_path = self.rename_result(model_dir) 146 result = pd.read_csv( 147 result_file_path, 148 sep='\t', 149 header=None, 150 names=['user', 'item', 'score'] 151 ) 152 153 # create topK ranking array 154 row_count = len(result) 155 ranks = np.zeros(row_count) 156 for i in range(row_count): 157 ranks[i] = i % top_k + 1 158 159 # add rank column 160 result['rank'] = ranks 161 result['rank'] = result['rank'].astype(int) 162 163 # overwrite result 164 result[['rank', 'user', 'item', 'score']].to_csv( 165 result_file_path, 166 sep='\t', 167 header=True, 168 index=False 169 ) 170 171 @staticmethod 172 def rename_result(model_dir: str) -> str: 173 """Rename the computed ratings file to be consistent with other pipelines. 174 175 Args: 176 model_dir: the directory where the computed ratings are stored. 177 178 Returns: 179 the file path of the result after renaming. 180 """ 181 for file in os.listdir(model_dir): 182 file_name = os.fsdecode(file) 183 # skip the model settings json 184 if '.tsv' not in file_name: 185 continue 186 187 src_path = os.path.join(model_dir, file_name) 188 dst_path = os.path.join(model_dir, MODEL_RATINGS_FILE) 189 190 os.rename(src_path, dst_path) 191 192 return dst_path
Recommendation Pipeline implementation for the Elliot framework.
50 def train_and_test_model( 51 self, 52 model: ElliotRecommender, 53 model_dir: str, 54 is_running: Callable[[], bool], 55 **kwargs) -> None: 56 """Train and test the specified model. 57 58 Convert the model configuration into a yml file that is accepted by the framework. 59 Feed it to the framework to obtain results, clear unwanted artifacts and modify the 60 ratings file so that it conforms to the standard convention. 61 62 Args: 63 model: the model that needs to be trained. 64 model_dir: the path of the directory where the computed ratings can be stored. 65 is_running: function that returns whether the pipeline 66 is still running. Stops early when False is returned. 67 68 Keyword Args: 69 num_items(int): the number of item recommendations to produce. 70 71 Raises: 72 ArithmeticError: possibly raised by a model on training or testing. 73 MemoryError: possibly raised by a model on training or testing. 74 RuntimeError: possibly raised by a model on training or testing. 75 """ 76 params = model.get_params() 77 params['meta'] = {'verbose': False, 'save_recs': True, 'save_weights': False} 78 79 top_k = kwargs['num_items'] 80 81 temp_dir = create_dir(os.path.join(model_dir, 'temp'), self.event_dispatcher) 82 yml_path = os.path.join(temp_dir, 'config.yml') 83 84 data = { 85 'experiment': { 86 'dataset': 'df', 87 'data_config': { 88 'strategy': 'fixed', 89 'train_path': os.path.join('..', '..', 'train_set.tsv'), 90 'test_path': os.path.join('..', '..', 'test_set.tsv'), 91 }, 92 'top_k': top_k, 93 'models': { 94 model.get_name(): params 95 }, 96 'evaluation': { 97 'simple_metrics': ['Precision'] 98 }, 99 'path_output_rec_result': model_dir, 100 'path_output_rec_weight': temp_dir, 101 'path_output_rec_performance': temp_dir 102 } 103 } 104 105 create_yml(yml_path, data, self.event_dispatcher) 106 107 # run_experiment(yml_path) 108 109 delete_dir(temp_dir, self.event_dispatcher) 110 if params.get('epochs'): 111 # remove everything so that only the final epochs file remains 112 self.clear_unused_epochs(params['epochs'], model_dir) 113 114 self.reconstruct_rank_column(model_dir, top_k)
Train and test the specified model.
Convert the model configuration into a yml file that is accepted by the framework. Feed it to the framework to obtain results, clear unwanted artifacts and modify the ratings file so that it conforms to the standard convention.
Args: model: the model that needs to be trained. model_dir: the path of the directory where the computed ratings can be stored. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.
Keyword Args: num_items(int): the number of item recommendations to produce.
Raises: ArithmeticError: possibly raised by a model on training or testing. MemoryError: possibly raised by a model on training or testing. RuntimeError: possibly raised by a model on training or testing.
116 def clear_unused_epochs(self, num_epochs: int, model_dir: str) -> None: 117 """Clear unused epochs from the model output directory. 118 119 Recommenders with an 'epochs' parameter will generate computed ratings 120 for each epoch. Only the final epoch is needed. 121 122 Args: 123 num_epochs: the number of epochs that was run by the algorithm. 124 model_dir: the directory where the computed ratings are stored. 125 """ 126 used_epoch = 'it=' + str(num_epochs) 127 for file in os.listdir(model_dir): 128 file_name = os.fsdecode(file) 129 # skip model settings json 130 if 'settings.json' in file_name: 131 continue 132 133 file_path = os.path.join(model_dir, file_name) 134 135 if used_epoch not in file_name: 136 delete_file(file_path, self.event_dispatcher)
Clear unused epochs from the model output directory.
Recommenders with an 'epochs' parameter will generate computed ratings for each epoch. Only the final epoch is needed.
Args: num_epochs: the number of epochs that was run by the algorithm. model_dir: the directory where the computed ratings are stored.
138 def reconstruct_rank_column(self, model_dir: str, top_k: int) -> None: 139 """Reconstruct the rank column in the result file that the framework generated. 140 141 Args: 142 model_dir: the directory where the computed ratings are stored. 143 top_k: the topK that was used to compute the ratings. 144 """ 145 result_file_path = self.rename_result(model_dir) 146 result = pd.read_csv( 147 result_file_path, 148 sep='\t', 149 header=None, 150 names=['user', 'item', 'score'] 151 ) 152 153 # create topK ranking array 154 row_count = len(result) 155 ranks = np.zeros(row_count) 156 for i in range(row_count): 157 ranks[i] = i % top_k + 1 158 159 # add rank column 160 result['rank'] = ranks 161 result['rank'] = result['rank'].astype(int) 162 163 # overwrite result 164 result[['rank', 'user', 'item', 'score']].to_csv( 165 result_file_path, 166 sep='\t', 167 header=True, 168 index=False 169 )
Reconstruct the rank column in the result file that the framework generated.
Args: model_dir: the directory where the computed ratings are stored. top_k: the topK that was used to compute the ratings.
171 @staticmethod 172 def rename_result(model_dir: str) -> str: 173 """Rename the computed ratings file to be consistent with other pipelines. 174 175 Args: 176 model_dir: the directory where the computed ratings are stored. 177 178 Returns: 179 the file path of the result after renaming. 180 """ 181 for file in os.listdir(model_dir): 182 file_name = os.fsdecode(file) 183 # skip the model settings json 184 if '.tsv' not in file_name: 185 continue 186 187 src_path = os.path.join(model_dir, file_name) 188 dst_path = os.path.join(model_dir, MODEL_RATINGS_FILE) 189 190 os.rename(src_path, dst_path) 191 192 return dst_path
Rename the computed ratings file to be consistent with other pipelines.
Args: model_dir: the directory where the computed ratings are stored.
Returns: the file path of the result after renaming.