src.fairreckitlib.data.pipeline.data_pipeline
This module contains functionality of the complete data pipeline.
Classes:
DataPipeline: class that performs dataset operations in preparation for the model pipeline.
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)
1"""This module contains functionality of the complete data pipeline. 2 3Classes: 4 5 DataPipeline: class that performs dataset operations in preparation for the model pipeline. 6 7This program has been developed by students from the bachelor Computer Science at 8Utrecht University within the Software Project course. 9© Copyright Utrecht University (Department of Information and Computing Sciences) 10""" 11 12import os 13import time 14from typing import Callable, Optional, Tuple 15 16import pandas as pd 17 18from ...core.config.config_factories import GroupFactory 19from ...core.events.event_dispatcher import EventDispatcher 20from ...core.events.event_error import ON_FAILURE_ERROR, ErrorEventArgs 21from ...core.io.io_create import create_dir 22from ...core.pipeline.core_pipeline import CorePipeline 23from ..data_transition import DataTransition 24from ..filter.filter_config import DataSubsetConfig 25from ..filter.filter_constants import KEY_DATA_SUBSET 26from ..filter.filter_event import FilterDataframeEventArgs 27from ..filter.filter_passes import filter_from_filter_passes 28from ..ratings.convert_config import ConvertConfig 29from ..ratings.convert_event import ConvertRatingsEventArgs 30from ..ratings.rating_converter_factory import KEY_RATING_CONVERTER 31from ..set.dataset import Dataset 32# from ..filter.filter_constants import KEY_DATA_FILTERS, deduce_filter_type 33from ..split.split_config import SplitConfig 34from ..split.split_constants import KEY_SPLITTING, KEY_SPLIT_TEST_RATIO 35from ..split.split_event import SplitDataframeEventArgs 36from .data_config import DataMatrixConfig 37from .data_event import ON_BEGIN_DATA_PIPELINE, ON_END_DATA_PIPELINE, DatasetEventArgs 38from .data_event import ON_BEGIN_LOAD_DATASET, ON_END_LOAD_DATASET, DatasetMatrixEventArgs 39from .data_event import ON_BEGIN_FILTER_DATASET, ON_END_FILTER_DATASET 40from .data_event import ON_BEGIN_CONVERT_RATINGS, ON_END_CONVERT_RATINGS 41from .data_event import ON_BEGIN_SPLIT_DATASET, ON_END_SPLIT_DATASET 42from .data_event import ON_BEGIN_SAVE_SETS, ON_END_SAVE_SETS, SaveSetsEventArgs 43 44 45class DataPipeline(CorePipeline): 46 """Data Pipeline to prepare a dataset for a transition to the ModelPipeline(s). 47 48 The pipeline is intended to be reused multiple times depending on the specified 49 datasets. This is not limited to using a dataset only once as they are numbered 50 internally to distinguish them later. 51 For each dataset the following steps are performed in order: 52 53 1) create output directory. 54 2) load the dataset into a dataframe. 55 3) filter rows based on 'user'/'item' columns. (optional) 56 4) convert 'rating' column. (optional) 57 5) split the dataframe into a train and test set. 58 6) save the train and test set in the output directory. 59 60 Public methods: 61 62 run 63 """ 64 65 def __init__(self, data_factory: GroupFactory, event_dispatcher: EventDispatcher): 66 """Construct the DataPipeline. 67 68 Args: 69 data_factory: the factory with available data modifier factories. 70 event_dispatcher: used to dispatch data/IO events when running the pipeline. 71 """ 72 CorePipeline.__init__(self, event_dispatcher) 73 self.split_datasets = {} 74 self.data_factory = data_factory 75 76 def run(self, 77 output_dir: str, 78 dataset: Dataset, 79 data_config: DataMatrixConfig, 80 is_running: Callable[[], bool]) -> Optional[DataTransition]: 81 """Run the entire data pipeline from beginning to end. 82 83 Args: 84 output_dir: the path of the directory to store the output. 85 dataset: the dataset to run the pipeline on. 86 data_config: the dataset matrix configurations. 87 is_running: function that returns whether the pipeline 88 is still running. Stops early when False is returned. 89 90 Raises: 91 FileNotFoundError: when the dataset matrix file does not exist. 92 IOError: when the specified output directory does not exist. 93 RuntimeError: when any data modifiers are not found in their respective factories. 94 95 Returns: 96 the data transition output of the pipeline. 97 """ 98 if not os.path.isdir(output_dir): 99 raise IOError('Unknown data output directory') 100 101 self.event_dispatcher.dispatch(DatasetEventArgs( 102 ON_BEGIN_DATA_PIPELINE, 103 dataset.get_name() 104 )) 105 106 start = time.time() 107 108 # step 1 109 data_dir = self.create_data_output_dir(output_dir, data_config) 110 111 # step 2 112 dataframe = self.load_from_dataset(dataset, data_config.matrix) 113 if not is_running(): 114 return None 115 116 # step 3 117 dataframe = self.filter_rows(output_dir, dataframe, data_config) 118 if not is_running(): 119 return None 120 121 # step 4 122 dataframe = self.convert_ratings(dataset, 123 data_config.matrix, 124 dataframe, 125 data_config.converter) 126 if not is_running(): 127 return None 128 129 # step 5 130 train_set, test_set = self.split(dataframe, data_config.splitting) 131 if not is_running(): 132 return None 133 134 # step 6 135 train_set_path, test_set_path = self.save_sets(data_dir, train_set, test_set) 136 137 # update data matrix counter 138 self.split_datasets[data_config.get_data_matrix_name()] += 1 139 140 end = time.time() 141 142 self.event_dispatcher.dispatch(DatasetEventArgs( 143 ON_END_DATA_PIPELINE, 144 dataset.get_name() 145 ), elapsed_time=end - start) 146 147 data_output = DataTransition( 148 dataset, 149 data_config.matrix, 150 data_dir, 151 train_set_path, 152 test_set_path, 153 (dataframe['rating'].min(), dataframe['rating'].max()) 154 ) 155 156 return data_output 157 158 def create_data_output_dir(self, output_dir: str, data_config: DataMatrixConfig) -> str: 159 """Create the data output directory for a dataset. 160 161 Args: 162 output_dir: the path of the directory to store the output. 163 data_config: the dataset matrix configuration to create a directory for. 164 165 Returns: 166 the path of the directory where the output data can be stored. 167 """ 168 dataset_matrix_name = data_config.get_data_matrix_name() 169 if not self.split_datasets.get(dataset_matrix_name): 170 self.split_datasets[dataset_matrix_name] = 0 171 172 index = self.split_datasets[dataset_matrix_name] 173 174 data_dir = os.path.join(output_dir, dataset_matrix_name + '_' + str(index)) 175 return create_dir(data_dir, self.event_dispatcher) 176 177 def load_from_dataset(self, dataset: Dataset, matrix_name: str) -> pd.DataFrame: 178 """Load in the desired dataset matrix into a dataframe. 179 180 The loaded dataframe contains at least three columns 'user', 'item', 'rating'. 181 In addition, the 'timestamp' column can be present when available in the specified dataset. 182 183 Args: 184 dataset: the dataset to load a matrix dataframe from. 185 matrix_name: the name of the matrix to load from the dataset. 186 187 Raises: 188 FileNotFoundError: when the dataset matrix file does not exist. 189 190 Returns: 191 the dataframe belonging to the specified dataset. 192 """ 193 self.event_dispatcher.dispatch(DatasetMatrixEventArgs( 194 ON_BEGIN_LOAD_DATASET, 195 dataset.get_name(), 196 matrix_name, 197 dataset.get_matrix_file_path(matrix_name) 198 )) 199 200 start = time.time() 201 202 try: 203 dataframe = dataset.load_matrix(matrix_name) 204 except FileNotFoundError as err: 205 self.event_dispatcher.dispatch(ErrorEventArgs( 206 ON_FAILURE_ERROR, 207 'Failure: to load dataset matrix ' + dataset.get_name() + '_' + matrix_name 208 )) 209 # raise again so the data run aborts 210 raise err 211 212 end = time.time() 213 214 self.event_dispatcher.dispatch(DatasetMatrixEventArgs( 215 ON_END_LOAD_DATASET, 216 dataset.get_name(), 217 matrix_name, 218 dataset.get_matrix_file_path(matrix_name) 219 ), elapsed_time=end - start) 220 221 return dataframe 222 223 def filter_rows(self, 224 output_dir: str, 225 dataframe: pd.DataFrame, 226 subset: DataSubsetConfig) -> pd.DataFrame: 227 """Apply the specified subset filters to the dataframe. 228 229 The subset is created by applying multiple filter passes to the dataframe individually. 230 These filter passes are then combined to form the resulting dataframe. 231 232 Args: 233 dataframe: the dataframe to filter with at least two columns: 'user', 'item'. 234 subset: the subset to create of to the dataframe. 235 236 Returns: 237 the dataframe with the specified subgroup filters applied to it. 238 """ 239 # early exit, because no filtering is needed 240 if len(subset.filter_passes) == 0: 241 return dataframe 242 243 self.event_dispatcher.dispatch(FilterDataframeEventArgs( 244 ON_BEGIN_FILTER_DATASET, 245 subset 246 )) 247 248 start = time.time() 249 filter_factory = self.data_factory.get_factory(KEY_DATA_SUBSET) 250 dataframe = filter_from_filter_passes(self, output_dir, dataframe, subset, filter_factory) 251 end = time.time() 252 253 self.event_dispatcher.dispatch(FilterDataframeEventArgs( 254 ON_END_FILTER_DATASET, 255 subset 256 ), elapsed_time=end - start) 257 258 return dataframe 259 260 def convert_ratings(self, 261 dataset: Dataset, 262 matrix_name: str, 263 dataframe: pd.DataFrame, 264 convert_config: ConvertConfig) -> pd.DataFrame: 265 """Convert the ratings in the dataframe with the specified rating modifier. 266 267 Args: 268 dataset: the dataset to load the matrix and rating_type from. 269 matrix_name: the name of the dataset matrix. 270 dataframe: the dataframe to convert the ratings of. 271 At the least a 'rating' column is expected to be present. 272 convert_config: the configuration of the converter to apply to the 'rating' column. 273 274 Raises: 275 RuntimeError: when the converter specified by the configuration is not available. 276 277 Returns: 278 the converted dataframe or the input dataframe when no converter is specified. 279 """ 280 if convert_config is None: 281 return dataframe 282 283 self.event_dispatcher.dispatch(ConvertRatingsEventArgs( 284 ON_BEGIN_CONVERT_RATINGS, 285 convert_config 286 )) 287 288 start = time.time() 289 290 converter_factory = self.data_factory.get_factory(KEY_RATING_CONVERTER) 291 dataset_converter_factory = converter_factory.get_factory(dataset.get_name()) 292 matrix_converter_factory = dataset_converter_factory.get_factory(matrix_name) 293 294 converter = matrix_converter_factory.create(convert_config.name, convert_config.params) 295 if converter is None: 296 self.event_dispatcher.dispatch(ErrorEventArgs( 297 ON_FAILURE_ERROR, 298 'Failure: to get converter from factory: ' + convert_config.name 299 )) 300 # raise error so the data run aborts 301 raise RuntimeError() 302 303 dataframe = converter.run(dataframe) 304 305 end = time.time() 306 307 self.event_dispatcher.dispatch(ConvertRatingsEventArgs( 308 ON_END_CONVERT_RATINGS, 309 convert_config 310 ), elapsed_time=end - start) 311 312 return dataframe 313 314 def split(self, 315 dataframe: pd.DataFrame, 316 split_config: SplitConfig) -> Tuple[pd.DataFrame, pd.DataFrame]: 317 """Split the dataframe into a train and test set. 318 319 This will be split 80/20 (or a similar ratio), and be done either random, or timestamp-wise. 320 The dataframe is expected to have at least three columns: 'user', 'item', 'rating'. 321 In addition, the 'timestamp' column is required for temporal splits. 322 323 Args: 324 dataframe: the dataframe to split into a train and test set. 325 split_config: the dataset splitting configuration. 326 327 Raises: 328 RuntimeError: when the splitter specified by the configuration is not available. 329 330 Returns: 331 the train and test set split of the specified dataframe. 332 """ 333 self.event_dispatcher.dispatch(SplitDataframeEventArgs( 334 ON_BEGIN_SPLIT_DATASET, 335 split_config 336 )) 337 338 start = time.time() 339 split_kwargs = {KEY_SPLIT_TEST_RATIO: split_config.test_ratio} 340 split_factory = self.data_factory.get_factory(KEY_SPLITTING) 341 splitter = split_factory.create(split_config.name, split_config.params, **split_kwargs) 342 if splitter is None: 343 self.event_dispatcher.dispatch(ErrorEventArgs( 344 ON_FAILURE_ERROR, 345 'Failure: to get splitter from factory: ' + split_config.name 346 )) 347 # raise error so the data run aborts 348 raise RuntimeError() 349 350 train_set, test_set = splitter.run(dataframe) 351 end = time.time() 352 353 self.event_dispatcher.dispatch(SplitDataframeEventArgs( 354 ON_END_SPLIT_DATASET, 355 split_config 356 ), elapsed_time=end - start) 357 358 return train_set, test_set 359 360 def save_sets(self, 361 output_dir: str, 362 train_set: pd.DataFrame, 363 test_set: pd.DataFrame) -> Tuple[str, str]: 364 """Save the train and test sets to the desired output directory. 365 366 Args: 367 output_dir: the path of the directory to store both sets. 368 train_set: the train set to save with at least three columns: 'user', 'item', 'rating'. 369 test_set: the test set to save with at least three columns: 'user', 'item', 'rating'. 370 371 Returns: 372 the paths where the train and test set are stored. 373 """ 374 headers_to_save = ['user', 'item', 'rating'] 375 376 train_set = train_set[headers_to_save] 377 test_set = test_set[headers_to_save] 378 379 train_set_path = os.path.join(output_dir, 'train_set.tsv') 380 test_set_path = os.path.join(output_dir, 'test_set.tsv') 381 382 self.event_dispatcher.dispatch(SaveSetsEventArgs( 383 ON_BEGIN_SAVE_SETS, 384 train_set_path, 385 test_set_path 386 )) 387 388 start = time.time() 389 train_set.to_csv(train_set_path, sep='\t', header=False, index=False) 390 test_set.to_csv(test_set_path, sep='\t', header=False, index=False) 391 end = time.time() 392 393 self.event_dispatcher.dispatch(SaveSetsEventArgs( 394 ON_END_SAVE_SETS, 395 train_set_path, 396 test_set_path 397 ), elapsed_time=end - start) 398 399 return train_set_path, test_set_path
46class DataPipeline(CorePipeline): 47 """Data Pipeline to prepare a dataset for a transition to the ModelPipeline(s). 48 49 The pipeline is intended to be reused multiple times depending on the specified 50 datasets. This is not limited to using a dataset only once as they are numbered 51 internally to distinguish them later. 52 For each dataset the following steps are performed in order: 53 54 1) create output directory. 55 2) load the dataset into a dataframe. 56 3) filter rows based on 'user'/'item' columns. (optional) 57 4) convert 'rating' column. (optional) 58 5) split the dataframe into a train and test set. 59 6) save the train and test set in the output directory. 60 61 Public methods: 62 63 run 64 """ 65 66 def __init__(self, data_factory: GroupFactory, event_dispatcher: EventDispatcher): 67 """Construct the DataPipeline. 68 69 Args: 70 data_factory: the factory with available data modifier factories. 71 event_dispatcher: used to dispatch data/IO events when running the pipeline. 72 """ 73 CorePipeline.__init__(self, event_dispatcher) 74 self.split_datasets = {} 75 self.data_factory = data_factory 76 77 def run(self, 78 output_dir: str, 79 dataset: Dataset, 80 data_config: DataMatrixConfig, 81 is_running: Callable[[], bool]) -> Optional[DataTransition]: 82 """Run the entire data pipeline from beginning to end. 83 84 Args: 85 output_dir: the path of the directory to store the output. 86 dataset: the dataset to run the pipeline on. 87 data_config: the dataset matrix configurations. 88 is_running: function that returns whether the pipeline 89 is still running. Stops early when False is returned. 90 91 Raises: 92 FileNotFoundError: when the dataset matrix file does not exist. 93 IOError: when the specified output directory does not exist. 94 RuntimeError: when any data modifiers are not found in their respective factories. 95 96 Returns: 97 the data transition output of the pipeline. 98 """ 99 if not os.path.isdir(output_dir): 100 raise IOError('Unknown data output directory') 101 102 self.event_dispatcher.dispatch(DatasetEventArgs( 103 ON_BEGIN_DATA_PIPELINE, 104 dataset.get_name() 105 )) 106 107 start = time.time() 108 109 # step 1 110 data_dir = self.create_data_output_dir(output_dir, data_config) 111 112 # step 2 113 dataframe = self.load_from_dataset(dataset, data_config.matrix) 114 if not is_running(): 115 return None 116 117 # step 3 118 dataframe = self.filter_rows(output_dir, dataframe, data_config) 119 if not is_running(): 120 return None 121 122 # step 4 123 dataframe = self.convert_ratings(dataset, 124 data_config.matrix, 125 dataframe, 126 data_config.converter) 127 if not is_running(): 128 return None 129 130 # step 5 131 train_set, test_set = self.split(dataframe, data_config.splitting) 132 if not is_running(): 133 return None 134 135 # step 6 136 train_set_path, test_set_path = self.save_sets(data_dir, train_set, test_set) 137 138 # update data matrix counter 139 self.split_datasets[data_config.get_data_matrix_name()] += 1 140 141 end = time.time() 142 143 self.event_dispatcher.dispatch(DatasetEventArgs( 144 ON_END_DATA_PIPELINE, 145 dataset.get_name() 146 ), elapsed_time=end - start) 147 148 data_output = DataTransition( 149 dataset, 150 data_config.matrix, 151 data_dir, 152 train_set_path, 153 test_set_path, 154 (dataframe['rating'].min(), dataframe['rating'].max()) 155 ) 156 157 return data_output 158 159 def create_data_output_dir(self, output_dir: str, data_config: DataMatrixConfig) -> str: 160 """Create the data output directory for a dataset. 161 162 Args: 163 output_dir: the path of the directory to store the output. 164 data_config: the dataset matrix configuration to create a directory for. 165 166 Returns: 167 the path of the directory where the output data can be stored. 168 """ 169 dataset_matrix_name = data_config.get_data_matrix_name() 170 if not self.split_datasets.get(dataset_matrix_name): 171 self.split_datasets[dataset_matrix_name] = 0 172 173 index = self.split_datasets[dataset_matrix_name] 174 175 data_dir = os.path.join(output_dir, dataset_matrix_name + '_' + str(index)) 176 return create_dir(data_dir, self.event_dispatcher) 177 178 def load_from_dataset(self, dataset: Dataset, matrix_name: str) -> pd.DataFrame: 179 """Load in the desired dataset matrix into a dataframe. 180 181 The loaded dataframe contains at least three columns 'user', 'item', 'rating'. 182 In addition, the 'timestamp' column can be present when available in the specified dataset. 183 184 Args: 185 dataset: the dataset to load a matrix dataframe from. 186 matrix_name: the name of the matrix to load from the dataset. 187 188 Raises: 189 FileNotFoundError: when the dataset matrix file does not exist. 190 191 Returns: 192 the dataframe belonging to the specified dataset. 193 """ 194 self.event_dispatcher.dispatch(DatasetMatrixEventArgs( 195 ON_BEGIN_LOAD_DATASET, 196 dataset.get_name(), 197 matrix_name, 198 dataset.get_matrix_file_path(matrix_name) 199 )) 200 201 start = time.time() 202 203 try: 204 dataframe = dataset.load_matrix(matrix_name) 205 except FileNotFoundError as err: 206 self.event_dispatcher.dispatch(ErrorEventArgs( 207 ON_FAILURE_ERROR, 208 'Failure: to load dataset matrix ' + dataset.get_name() + '_' + matrix_name 209 )) 210 # raise again so the data run aborts 211 raise err 212 213 end = time.time() 214 215 self.event_dispatcher.dispatch(DatasetMatrixEventArgs( 216 ON_END_LOAD_DATASET, 217 dataset.get_name(), 218 matrix_name, 219 dataset.get_matrix_file_path(matrix_name) 220 ), elapsed_time=end - start) 221 222 return dataframe 223 224 def filter_rows(self, 225 output_dir: str, 226 dataframe: pd.DataFrame, 227 subset: DataSubsetConfig) -> pd.DataFrame: 228 """Apply the specified subset filters to the dataframe. 229 230 The subset is created by applying multiple filter passes to the dataframe individually. 231 These filter passes are then combined to form the resulting dataframe. 232 233 Args: 234 dataframe: the dataframe to filter with at least two columns: 'user', 'item'. 235 subset: the subset to create of to the dataframe. 236 237 Returns: 238 the dataframe with the specified subgroup filters applied to it. 239 """ 240 # early exit, because no filtering is needed 241 if len(subset.filter_passes) == 0: 242 return dataframe 243 244 self.event_dispatcher.dispatch(FilterDataframeEventArgs( 245 ON_BEGIN_FILTER_DATASET, 246 subset 247 )) 248 249 start = time.time() 250 filter_factory = self.data_factory.get_factory(KEY_DATA_SUBSET) 251 dataframe = filter_from_filter_passes(self, output_dir, dataframe, subset, filter_factory) 252 end = time.time() 253 254 self.event_dispatcher.dispatch(FilterDataframeEventArgs( 255 ON_END_FILTER_DATASET, 256 subset 257 ), elapsed_time=end - start) 258 259 return dataframe 260 261 def convert_ratings(self, 262 dataset: Dataset, 263 matrix_name: str, 264 dataframe: pd.DataFrame, 265 convert_config: ConvertConfig) -> pd.DataFrame: 266 """Convert the ratings in the dataframe with the specified rating modifier. 267 268 Args: 269 dataset: the dataset to load the matrix and rating_type from. 270 matrix_name: the name of the dataset matrix. 271 dataframe: the dataframe to convert the ratings of. 272 At the least a 'rating' column is expected to be present. 273 convert_config: the configuration of the converter to apply to the 'rating' column. 274 275 Raises: 276 RuntimeError: when the converter specified by the configuration is not available. 277 278 Returns: 279 the converted dataframe or the input dataframe when no converter is specified. 280 """ 281 if convert_config is None: 282 return dataframe 283 284 self.event_dispatcher.dispatch(ConvertRatingsEventArgs( 285 ON_BEGIN_CONVERT_RATINGS, 286 convert_config 287 )) 288 289 start = time.time() 290 291 converter_factory = self.data_factory.get_factory(KEY_RATING_CONVERTER) 292 dataset_converter_factory = converter_factory.get_factory(dataset.get_name()) 293 matrix_converter_factory = dataset_converter_factory.get_factory(matrix_name) 294 295 converter = matrix_converter_factory.create(convert_config.name, convert_config.params) 296 if converter is None: 297 self.event_dispatcher.dispatch(ErrorEventArgs( 298 ON_FAILURE_ERROR, 299 'Failure: to get converter from factory: ' + convert_config.name 300 )) 301 # raise error so the data run aborts 302 raise RuntimeError() 303 304 dataframe = converter.run(dataframe) 305 306 end = time.time() 307 308 self.event_dispatcher.dispatch(ConvertRatingsEventArgs( 309 ON_END_CONVERT_RATINGS, 310 convert_config 311 ), elapsed_time=end - start) 312 313 return dataframe 314 315 def split(self, 316 dataframe: pd.DataFrame, 317 split_config: SplitConfig) -> Tuple[pd.DataFrame, pd.DataFrame]: 318 """Split the dataframe into a train and test set. 319 320 This will be split 80/20 (or a similar ratio), and be done either random, or timestamp-wise. 321 The dataframe is expected to have at least three columns: 'user', 'item', 'rating'. 322 In addition, the 'timestamp' column is required for temporal splits. 323 324 Args: 325 dataframe: the dataframe to split into a train and test set. 326 split_config: the dataset splitting configuration. 327 328 Raises: 329 RuntimeError: when the splitter specified by the configuration is not available. 330 331 Returns: 332 the train and test set split of the specified dataframe. 333 """ 334 self.event_dispatcher.dispatch(SplitDataframeEventArgs( 335 ON_BEGIN_SPLIT_DATASET, 336 split_config 337 )) 338 339 start = time.time() 340 split_kwargs = {KEY_SPLIT_TEST_RATIO: split_config.test_ratio} 341 split_factory = self.data_factory.get_factory(KEY_SPLITTING) 342 splitter = split_factory.create(split_config.name, split_config.params, **split_kwargs) 343 if splitter is None: 344 self.event_dispatcher.dispatch(ErrorEventArgs( 345 ON_FAILURE_ERROR, 346 'Failure: to get splitter from factory: ' + split_config.name 347 )) 348 # raise error so the data run aborts 349 raise RuntimeError() 350 351 train_set, test_set = splitter.run(dataframe) 352 end = time.time() 353 354 self.event_dispatcher.dispatch(SplitDataframeEventArgs( 355 ON_END_SPLIT_DATASET, 356 split_config 357 ), elapsed_time=end - start) 358 359 return train_set, test_set 360 361 def save_sets(self, 362 output_dir: str, 363 train_set: pd.DataFrame, 364 test_set: pd.DataFrame) -> Tuple[str, str]: 365 """Save the train and test sets to the desired output directory. 366 367 Args: 368 output_dir: the path of the directory to store both sets. 369 train_set: the train set to save with at least three columns: 'user', 'item', 'rating'. 370 test_set: the test set to save with at least three columns: 'user', 'item', 'rating'. 371 372 Returns: 373 the paths where the train and test set are stored. 374 """ 375 headers_to_save = ['user', 'item', 'rating'] 376 377 train_set = train_set[headers_to_save] 378 test_set = test_set[headers_to_save] 379 380 train_set_path = os.path.join(output_dir, 'train_set.tsv') 381 test_set_path = os.path.join(output_dir, 'test_set.tsv') 382 383 self.event_dispatcher.dispatch(SaveSetsEventArgs( 384 ON_BEGIN_SAVE_SETS, 385 train_set_path, 386 test_set_path 387 )) 388 389 start = time.time() 390 train_set.to_csv(train_set_path, sep='\t', header=False, index=False) 391 test_set.to_csv(test_set_path, sep='\t', header=False, index=False) 392 end = time.time() 393 394 self.event_dispatcher.dispatch(SaveSetsEventArgs( 395 ON_END_SAVE_SETS, 396 train_set_path, 397 test_set_path 398 ), elapsed_time=end - start) 399 400 return train_set_path, test_set_path
Data Pipeline to prepare a dataset for a transition to the ModelPipeline(s).
The pipeline is intended to be reused multiple times depending on the specified datasets. This is not limited to using a dataset only once as they are numbered internally to distinguish them later. For each dataset the following steps are performed in order:
1) create output directory. 2) load the dataset into a dataframe. 3) filter rows based on 'user'/'item' columns. (optional) 4) convert 'rating' column. (optional) 5) split the dataframe into a train and test set. 6) save the train and test set in the output directory.
Public methods:
run
66 def __init__(self, data_factory: GroupFactory, event_dispatcher: EventDispatcher): 67 """Construct the DataPipeline. 68 69 Args: 70 data_factory: the factory with available data modifier factories. 71 event_dispatcher: used to dispatch data/IO events when running the pipeline. 72 """ 73 CorePipeline.__init__(self, event_dispatcher) 74 self.split_datasets = {} 75 self.data_factory = data_factory
Construct the DataPipeline.
Args: data_factory: the factory with available data modifier factories. event_dispatcher: used to dispatch data/IO events when running the pipeline.
77 def run(self, 78 output_dir: str, 79 dataset: Dataset, 80 data_config: DataMatrixConfig, 81 is_running: Callable[[], bool]) -> Optional[DataTransition]: 82 """Run the entire data pipeline from beginning to end. 83 84 Args: 85 output_dir: the path of the directory to store the output. 86 dataset: the dataset to run the pipeline on. 87 data_config: the dataset matrix configurations. 88 is_running: function that returns whether the pipeline 89 is still running. Stops early when False is returned. 90 91 Raises: 92 FileNotFoundError: when the dataset matrix file does not exist. 93 IOError: when the specified output directory does not exist. 94 RuntimeError: when any data modifiers are not found in their respective factories. 95 96 Returns: 97 the data transition output of the pipeline. 98 """ 99 if not os.path.isdir(output_dir): 100 raise IOError('Unknown data output directory') 101 102 self.event_dispatcher.dispatch(DatasetEventArgs( 103 ON_BEGIN_DATA_PIPELINE, 104 dataset.get_name() 105 )) 106 107 start = time.time() 108 109 # step 1 110 data_dir = self.create_data_output_dir(output_dir, data_config) 111 112 # step 2 113 dataframe = self.load_from_dataset(dataset, data_config.matrix) 114 if not is_running(): 115 return None 116 117 # step 3 118 dataframe = self.filter_rows(output_dir, dataframe, data_config) 119 if not is_running(): 120 return None 121 122 # step 4 123 dataframe = self.convert_ratings(dataset, 124 data_config.matrix, 125 dataframe, 126 data_config.converter) 127 if not is_running(): 128 return None 129 130 # step 5 131 train_set, test_set = self.split(dataframe, data_config.splitting) 132 if not is_running(): 133 return None 134 135 # step 6 136 train_set_path, test_set_path = self.save_sets(data_dir, train_set, test_set) 137 138 # update data matrix counter 139 self.split_datasets[data_config.get_data_matrix_name()] += 1 140 141 end = time.time() 142 143 self.event_dispatcher.dispatch(DatasetEventArgs( 144 ON_END_DATA_PIPELINE, 145 dataset.get_name() 146 ), elapsed_time=end - start) 147 148 data_output = DataTransition( 149 dataset, 150 data_config.matrix, 151 data_dir, 152 train_set_path, 153 test_set_path, 154 (dataframe['rating'].min(), dataframe['rating'].max()) 155 ) 156 157 return data_output
Run the entire data pipeline from beginning to end.
Args: output_dir: the path of the directory to store the output. dataset: the dataset to run the pipeline on. data_config: the dataset matrix configurations. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.
Raises: FileNotFoundError: when the dataset matrix file does not exist. IOError: when the specified output directory does not exist. RuntimeError: when any data modifiers are not found in their respective factories.
Returns: the data transition output of the pipeline.
159 def create_data_output_dir(self, output_dir: str, data_config: DataMatrixConfig) -> str: 160 """Create the data output directory for a dataset. 161 162 Args: 163 output_dir: the path of the directory to store the output. 164 data_config: the dataset matrix configuration to create a directory for. 165 166 Returns: 167 the path of the directory where the output data can be stored. 168 """ 169 dataset_matrix_name = data_config.get_data_matrix_name() 170 if not self.split_datasets.get(dataset_matrix_name): 171 self.split_datasets[dataset_matrix_name] = 0 172 173 index = self.split_datasets[dataset_matrix_name] 174 175 data_dir = os.path.join(output_dir, dataset_matrix_name + '_' + str(index)) 176 return create_dir(data_dir, self.event_dispatcher)
Create the data output directory for a dataset.
Args: output_dir: the path of the directory to store the output. data_config: the dataset matrix configuration to create a directory for.
Returns: the path of the directory where the output data can be stored.
178 def load_from_dataset(self, dataset: Dataset, matrix_name: str) -> pd.DataFrame: 179 """Load in the desired dataset matrix into a dataframe. 180 181 The loaded dataframe contains at least three columns 'user', 'item', 'rating'. 182 In addition, the 'timestamp' column can be present when available in the specified dataset. 183 184 Args: 185 dataset: the dataset to load a matrix dataframe from. 186 matrix_name: the name of the matrix to load from the dataset. 187 188 Raises: 189 FileNotFoundError: when the dataset matrix file does not exist. 190 191 Returns: 192 the dataframe belonging to the specified dataset. 193 """ 194 self.event_dispatcher.dispatch(DatasetMatrixEventArgs( 195 ON_BEGIN_LOAD_DATASET, 196 dataset.get_name(), 197 matrix_name, 198 dataset.get_matrix_file_path(matrix_name) 199 )) 200 201 start = time.time() 202 203 try: 204 dataframe = dataset.load_matrix(matrix_name) 205 except FileNotFoundError as err: 206 self.event_dispatcher.dispatch(ErrorEventArgs( 207 ON_FAILURE_ERROR, 208 'Failure: to load dataset matrix ' + dataset.get_name() + '_' + matrix_name 209 )) 210 # raise again so the data run aborts 211 raise err 212 213 end = time.time() 214 215 self.event_dispatcher.dispatch(DatasetMatrixEventArgs( 216 ON_END_LOAD_DATASET, 217 dataset.get_name(), 218 matrix_name, 219 dataset.get_matrix_file_path(matrix_name) 220 ), elapsed_time=end - start) 221 222 return dataframe
Load in the desired dataset matrix into a dataframe.
The loaded dataframe contains at least three columns 'user', 'item', 'rating'. In addition, the 'timestamp' column can be present when available in the specified dataset.
Args: dataset: the dataset to load a matrix dataframe from. matrix_name: the name of the matrix to load from the dataset.
Raises: FileNotFoundError: when the dataset matrix file does not exist.
Returns: the dataframe belonging to the specified dataset.
224 def filter_rows(self, 225 output_dir: str, 226 dataframe: pd.DataFrame, 227 subset: DataSubsetConfig) -> pd.DataFrame: 228 """Apply the specified subset filters to the dataframe. 229 230 The subset is created by applying multiple filter passes to the dataframe individually. 231 These filter passes are then combined to form the resulting dataframe. 232 233 Args: 234 dataframe: the dataframe to filter with at least two columns: 'user', 'item'. 235 subset: the subset to create of to the dataframe. 236 237 Returns: 238 the dataframe with the specified subgroup filters applied to it. 239 """ 240 # early exit, because no filtering is needed 241 if len(subset.filter_passes) == 0: 242 return dataframe 243 244 self.event_dispatcher.dispatch(FilterDataframeEventArgs( 245 ON_BEGIN_FILTER_DATASET, 246 subset 247 )) 248 249 start = time.time() 250 filter_factory = self.data_factory.get_factory(KEY_DATA_SUBSET) 251 dataframe = filter_from_filter_passes(self, output_dir, dataframe, subset, filter_factory) 252 end = time.time() 253 254 self.event_dispatcher.dispatch(FilterDataframeEventArgs( 255 ON_END_FILTER_DATASET, 256 subset 257 ), elapsed_time=end - start) 258 259 return dataframe
Apply the specified subset filters to the dataframe.
The subset is created by applying multiple filter passes to the dataframe individually. These filter passes are then combined to form the resulting dataframe.
Args: dataframe: the dataframe to filter with at least two columns: 'user', 'item'. subset: the subset to create of to the dataframe.
Returns: the dataframe with the specified subgroup filters applied to it.
261 def convert_ratings(self, 262 dataset: Dataset, 263 matrix_name: str, 264 dataframe: pd.DataFrame, 265 convert_config: ConvertConfig) -> pd.DataFrame: 266 """Convert the ratings in the dataframe with the specified rating modifier. 267 268 Args: 269 dataset: the dataset to load the matrix and rating_type from. 270 matrix_name: the name of the dataset matrix. 271 dataframe: the dataframe to convert the ratings of. 272 At the least a 'rating' column is expected to be present. 273 convert_config: the configuration of the converter to apply to the 'rating' column. 274 275 Raises: 276 RuntimeError: when the converter specified by the configuration is not available. 277 278 Returns: 279 the converted dataframe or the input dataframe when no converter is specified. 280 """ 281 if convert_config is None: 282 return dataframe 283 284 self.event_dispatcher.dispatch(ConvertRatingsEventArgs( 285 ON_BEGIN_CONVERT_RATINGS, 286 convert_config 287 )) 288 289 start = time.time() 290 291 converter_factory = self.data_factory.get_factory(KEY_RATING_CONVERTER) 292 dataset_converter_factory = converter_factory.get_factory(dataset.get_name()) 293 matrix_converter_factory = dataset_converter_factory.get_factory(matrix_name) 294 295 converter = matrix_converter_factory.create(convert_config.name, convert_config.params) 296 if converter is None: 297 self.event_dispatcher.dispatch(ErrorEventArgs( 298 ON_FAILURE_ERROR, 299 'Failure: to get converter from factory: ' + convert_config.name 300 )) 301 # raise error so the data run aborts 302 raise RuntimeError() 303 304 dataframe = converter.run(dataframe) 305 306 end = time.time() 307 308 self.event_dispatcher.dispatch(ConvertRatingsEventArgs( 309 ON_END_CONVERT_RATINGS, 310 convert_config 311 ), elapsed_time=end - start) 312 313 return dataframe
Convert the ratings in the dataframe with the specified rating modifier.
Args: dataset: the dataset to load the matrix and rating_type from. matrix_name: the name of the dataset matrix. dataframe: the dataframe to convert the ratings of. At the least a 'rating' column is expected to be present. convert_config: the configuration of the converter to apply to the 'rating' column.
Raises: RuntimeError: when the converter specified by the configuration is not available.
Returns: the converted dataframe or the input dataframe when no converter is specified.
315 def split(self, 316 dataframe: pd.DataFrame, 317 split_config: SplitConfig) -> Tuple[pd.DataFrame, pd.DataFrame]: 318 """Split the dataframe into a train and test set. 319 320 This will be split 80/20 (or a similar ratio), and be done either random, or timestamp-wise. 321 The dataframe is expected to have at least three columns: 'user', 'item', 'rating'. 322 In addition, the 'timestamp' column is required for temporal splits. 323 324 Args: 325 dataframe: the dataframe to split into a train and test set. 326 split_config: the dataset splitting configuration. 327 328 Raises: 329 RuntimeError: when the splitter specified by the configuration is not available. 330 331 Returns: 332 the train and test set split of the specified dataframe. 333 """ 334 self.event_dispatcher.dispatch(SplitDataframeEventArgs( 335 ON_BEGIN_SPLIT_DATASET, 336 split_config 337 )) 338 339 start = time.time() 340 split_kwargs = {KEY_SPLIT_TEST_RATIO: split_config.test_ratio} 341 split_factory = self.data_factory.get_factory(KEY_SPLITTING) 342 splitter = split_factory.create(split_config.name, split_config.params, **split_kwargs) 343 if splitter is None: 344 self.event_dispatcher.dispatch(ErrorEventArgs( 345 ON_FAILURE_ERROR, 346 'Failure: to get splitter from factory: ' + split_config.name 347 )) 348 # raise error so the data run aborts 349 raise RuntimeError() 350 351 train_set, test_set = splitter.run(dataframe) 352 end = time.time() 353 354 self.event_dispatcher.dispatch(SplitDataframeEventArgs( 355 ON_END_SPLIT_DATASET, 356 split_config 357 ), elapsed_time=end - start) 358 359 return train_set, test_set
Split the dataframe into a train and test set.
This will be split 80/20 (or a similar ratio), and be done either random, or timestamp-wise. The dataframe is expected to have at least three columns: 'user', 'item', 'rating'. In addition, the 'timestamp' column is required for temporal splits.
Args: dataframe: the dataframe to split into a train and test set. split_config: the dataset splitting configuration.
Raises: RuntimeError: when the splitter specified by the configuration is not available.
Returns: the train and test set split of the specified dataframe.
361 def save_sets(self, 362 output_dir: str, 363 train_set: pd.DataFrame, 364 test_set: pd.DataFrame) -> Tuple[str, str]: 365 """Save the train and test sets to the desired output directory. 366 367 Args: 368 output_dir: the path of the directory to store both sets. 369 train_set: the train set to save with at least three columns: 'user', 'item', 'rating'. 370 test_set: the test set to save with at least three columns: 'user', 'item', 'rating'. 371 372 Returns: 373 the paths where the train and test set are stored. 374 """ 375 headers_to_save = ['user', 'item', 'rating'] 376 377 train_set = train_set[headers_to_save] 378 test_set = test_set[headers_to_save] 379 380 train_set_path = os.path.join(output_dir, 'train_set.tsv') 381 test_set_path = os.path.join(output_dir, 'test_set.tsv') 382 383 self.event_dispatcher.dispatch(SaveSetsEventArgs( 384 ON_BEGIN_SAVE_SETS, 385 train_set_path, 386 test_set_path 387 )) 388 389 start = time.time() 390 train_set.to_csv(train_set_path, sep='\t', header=False, index=False) 391 test_set.to_csv(test_set_path, sep='\t', header=False, index=False) 392 end = time.time() 393 394 self.event_dispatcher.dispatch(SaveSetsEventArgs( 395 ON_END_SAVE_SETS, 396 train_set_path, 397 test_set_path 398 ), elapsed_time=end - start) 399 400 return train_set_path, test_set_path
Save the train and test sets to the desired output directory.
Args: output_dir: the path of the directory to store both sets. train_set: the train set to save with at least three columns: 'user', 'item', 'rating'. test_set: the test set to save with at least three columns: 'user', 'item', 'rating'.
Returns: the paths where the train and test set are stored.