src.fairreckitlib.data.set.dataset_matrix
This module contains functionality to create matrices from dataset event tables.
Classes:
MatrixProcessorConfig: the matrix processor configuration.
DatasetMatrixProcessor: the dataset matrix processor that generates/adds user-item matrices.
Functions:
create_matrix_chunk: create a user-item matrix chunk by counting user-item occurrences.
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)
1"""This module contains functionality to create matrices from dataset event tables. 2 3Classes: 4 5 MatrixProcessorConfig: the matrix processor configuration. 6 DatasetMatrixProcessor: the dataset matrix processor that generates/adds user-item matrices. 7 8Functions: 9 10 create_matrix_chunk: create a user-item matrix chunk by counting user-item occurrences. 11 12This program has been developed by students from the bachelor Computer Science at 13Utrecht University within the Software Project course. 14© Copyright Utrecht University (Department of Information and Computing Sciences) 15""" 16 17from dataclasses import dataclass 18import os 19from typing import List, Tuple 20 21import numpy as np 22import pandas as pd 23 24from ...core.events.event_dispatcher import EventDispatcher 25from ...core.io.event_io import get_io_events, get_io_event_print_switch 26from ...core.io.io_create import create_dir 27from ...core.io.io_delete import delete_dir 28from ...core.io.io_utility import save_yml 29from ..ratings.convert_constants import RATING_TYPE_THRESHOLD 30from .dataset import Dataset 31from .dataset_constants import TABLE_FILE_PREFIX, DATASET_CONFIG_FILE 32from .dataset_config import DATASET_RATINGS_EXPLICIT, DATASET_RATINGS_IMPLICIT, \ 33 DatasetTableConfig, DatasetMatrixConfig, DatasetIndexConfig, RatingMatrixConfig, \ 34 create_dataset_table_config 35 36DEFAULT_MATRIX_CHUNK_SIZE = 10E6 37 38 39@dataclass 40class MatrixProcessorConfig: 41 """Matrix Processor Configuration. 42 43 event_table_name: the name of the event table to use. 44 item_key: the item key name to create a user-item matrix for. 45 rating_column: the name of the rating column to use in the user-item matrix. 46 """ 47 48 event_table_name: str 49 item_key: str 50 rating_column: str 51 52 53class DatasetMatrixProcessor: 54 """Dataset Matrix Processor. 55 56 The intended use of this class is to utilize an existing dataset that has event tables present, 57 in order to generate a new user-item matrix. The processor does the following steps in order: 58 59 1) create a temporary directory to store user-item chunks. 60 2) process the event table by creating and saving user-item chunks. 61 3) process the matrix by merging chunks. 62 4) save the matrix in the dataset directory. 63 5) update the dataset configuration file with the new user-item matrix. 64 6) remove the temporary directory with. 65 66 Public methods: 67 68 run 69 """ 70 71 def __init__(self, dataset: Dataset, verbose=True): 72 """Construct the dataset matrix processor. 73 74 Args: 75 dataset: the dataset to create a new user-item matrix for. 76 verbose: whether the processor should give verbose output. 77 """ 78 self.dataset = dataset 79 self.verbose = verbose 80 print_event = lambda _, args: get_io_event_print_switch()[args.event_id](args) 81 82 self.event_dispatcher = EventDispatcher() 83 for event_id in get_io_events(): 84 self.event_dispatcher.add_listener(event_id, None, (print_event, None)) 85 86 def run(self, 87 processor_config: MatrixProcessorConfig, 88 *, 89 chunk_size: int=DEFAULT_MATRIX_CHUNK_SIZE) -> bool: 90 """Run the processor with the specified matrix configuration. 91 92 The processor fails when the user-item matrix is already present in the 93 existing dataset configuration. 94 95 Args: 96 processor_config: the configuration to use for creating a user-item matrix. 97 chunk_size: the size of the chunks to use during processing. 98 99 Raises: 100 KeyError: when the event table does not exist in the dataset. 101 IndexError: when the item key name is not present in the event table. 102 103 Returns: 104 whether the processing of the user-item matrix succeeded. 105 """ 106 if not processor_config.event_table_name in self.dataset.config.events: 107 raise KeyError('Event table does not exist: ' + processor_config.event_table_name) 108 109 event_table = self.dataset.config.events[processor_config.event_table_name] 110 item_key = processor_config.item_key 111 if item_key not in event_table.primary_key: 112 raise IndexError('Event table does not have the requested item key') 113 114 matrix_name = 'user-' + item_key.split('_')[0] + '-' + processor_config.rating_column 115 if self.dataset.get_matrix_config(matrix_name) is not None: 116 return False 117 118 # step 1 119 temp_dir = create_dir( 120 os.path.join(self.dataset.data_dir, TABLE_FILE_PREFIX + 'tmp'), 121 self.event_dispatcher 122 ) 123 124 if self.verbose: 125 print('Started processing matrix') 126 127 # step 2 128 num_chunks = self.process_event_table( 129 temp_dir, 130 event_table, 131 item_key, 132 processor_config.rating_column, 133 chunk_size=chunk_size 134 ) 135 136 # step 3 137 matrix_tuple = self.process_matrix_from_chunks( 138 temp_dir, 139 num_chunks, 140 item_key, 141 processor_config.rating_column 142 ) 143 144 # step 4 145 dataset_matrix_config = self.save_matrix( 146 matrix_name, 147 matrix_tuple, 148 item_key, 149 processor_config.rating_column 150 ) 151 152 # step 5 153 self.dataset.config.matrices[matrix_name] = dataset_matrix_config 154 save_yml( 155 os.path.join(self.dataset.data_dir, DATASET_CONFIG_FILE), 156 self.dataset.config.to_yml_format() 157 ) 158 159 if self.verbose: 160 print('Finished processing matrix') 161 162 # step 6 163 delete_dir(temp_dir, self.event_dispatcher) 164 return True 165 166 def process_event_table( 167 self, 168 output_dir: str, 169 event_table: DatasetTableConfig, 170 item_key: str, 171 rating_column: str, 172 *, 173 chunk_size: int=DEFAULT_MATRIX_CHUNK_SIZE) -> int: 174 """Process the event table in chunks. 175 176 Args: 177 output_dir: the output directory to store the chunks. 178 event_table: the event table to process into chunks. 179 item_key: the item key name to create a user-item chunk for. 180 rating_column: the name of the rating column to use in the user-item chunk. 181 chunk_size: the size of the chunks to use during processing. 182 183 Returns: 184 the number of chunks that are generated. 185 """ 186 if self.verbose: 187 print('Started processing event table') 188 189 num_chunks = 0 190 start_row = 0 191 event_table_it = event_table.read_table(self.dataset.data_dir, chunk_size=chunk_size) 192 for i, chunk in enumerate(event_table_it): 193 end_row = int(min(start_row + chunk_size, event_table.num_records)) 194 percent = float(start_row) / float(event_table.num_records) * 100.0 195 if self.verbose: 196 print('Processing rows', start_row, 'to', end_row, 197 'of', event_table.num_records, '=>', str(percent) + '%') 198 199 chunk = create_matrix_chunk(chunk, item_key, rating_column) 200 # TODO filter chunk based on user/item columns 201 chunk.to_csv(os.path.join(output_dir, 'chunk_' + str(i) + '.tsv'), 202 sep='\t', header=True, index=False) 203 204 num_chunks += 1 205 start_row += chunk_size 206 207 if self.verbose: 208 print('Finished processing event table') 209 210 return num_chunks 211 212 def process_matrix_from_chunks( 213 self, 214 output_dir: str, 215 num_chunks: int, 216 item_key: str, 217 rating_column: str) -> Tuple[pd.DataFrame, List[int], List[int]]: 218 """Process the user-item matrix from the stored chunks. 219 220 Args: 221 output_dir: the output directory where the chunks are stored. 222 num_chunks: the number of chunks in the output directory. 223 item_key: the item key name that was used to create user-item chunks. 224 rating_column: the name of the rating column that was used to create user-item chunks. 225 226 Returns: 227 the user-item matrix and the lists of unique user/item ids. 228 """ 229 if self.verbose: 230 print('Started processing matrix chunks') 231 232 matrix = pd.DataFrame({ 233 'user_id': pd.Series(dtype='int'), 234 item_key: pd.Series(dtype='int'), 235 rating_column: pd.Series(dtype='float') 236 }) 237 238 for i in range(num_chunks): 239 if self.verbose: 240 print('Processing matrix chunk', i + 1, 'of', num_chunks) 241 242 chunk = pd.read_table(os.path.join(output_dir, 'chunk_' + str(i) + '.tsv')) 243 matrix = pd.concat([matrix, chunk], ignore_index=True) 244 matrix = matrix.groupby(['user_id', item_key], as_index=False).sum() 245 246 unique_users = matrix['user_id'].unique() 247 # users from 0...num_users 248 matrix = pd.merge( 249 matrix, 250 pd.DataFrame(list(enumerate(unique_users)), columns=['user', 'user_id']), 251 how='left', on='user_id' 252 ) 253 matrix.drop('user_id', axis=1, inplace=True) 254 255 unique_items = matrix[item_key].unique() 256 # items from 0...num_items 257 matrix = pd.merge( 258 matrix, 259 pd.DataFrame(list(enumerate(unique_items)), columns=['item', item_key]), 260 how='left', on=item_key 261 ) 262 matrix.drop(item_key, axis=1, inplace=True) 263 264 if self.verbose: 265 print('Finished processing matrix chunks') 266 267 return matrix[['user', 'item', rating_column]], list(unique_users), list(unique_items) 268 269 def save_matrix( 270 self, 271 matrix_name: str, 272 matrix_tuple: Tuple[pd.DataFrame, List[int], List[int]], 273 item_key: str, 274 rating_column: str) -> DatasetMatrixConfig: 275 """Save the matrix in the dataset directory. 276 277 Args: 278 matrix_name: the name that is used to save the matrix, user and item lists. 279 matrix_tuple: the user-item matrix and the lists of unique user/item ids. 280 item_key: the item key name that was used to create the user-item matrix. 281 rating_column: name of the rating column that was used to create the user-item matrix. 282 283 Returns: 284 the dataset matrix configuration. 285 """ 286 if self.verbose: 287 print('Started saving matrix') 288 289 dataset_matrix_name = TABLE_FILE_PREFIX + self.dataset.get_name() + '_' + matrix_name 290 matrix, users, items = matrix_tuple 291 292 # create the user indices config and save the array 293 user_index_config = DatasetIndexConfig( 294 dataset_matrix_name + '_user_indices.hdf5', 295 'user_id', 296 len(users) 297 ) 298 user_index_config.save_indices(self.dataset.data_dir, users) 299 300 # create the item indices config and save the array 301 item_index_config = DatasetIndexConfig( 302 dataset_matrix_name + '_item_indices.hdf5', 303 item_key, 304 len(items) 305 ) 306 item_index_config.save_indices(self.dataset.data_dir, items) 307 308 # create the sample matrix table config and save the table 309 matrix_table_config = create_dataset_table_config( 310 dataset_matrix_name + '_matrix.tsv.bz2', 311 ['user_id', item_key], 312 ['matrix_' + rating_column], 313 compression='bz2', 314 foreign_keys=['user_id', item_key], 315 num_records=len(matrix) 316 ) 317 matrix_table_config.save_table(matrix, self.dataset.data_dir) 318 319 rating_min = float(matrix[rating_column].min()) 320 rating_max = float(matrix[rating_column].max()) 321 rating_type = DATASET_RATINGS_IMPLICIT \ 322 if rating_max > RATING_TYPE_THRESHOLD else DATASET_RATINGS_EXPLICIT 323 324 if self.verbose: 325 print('Finished saving matrix') 326 327 return DatasetMatrixConfig( 328 matrix_table_config, 329 RatingMatrixConfig( 330 rating_min, 331 rating_max, 332 rating_type 333 ), 334 user_index_config, 335 item_index_config 336 ) 337 338 339 340def create_matrix_chunk(chunk: pd.DataFrame, item_key: str, rating_column: str) -> pd.DataFrame: 341 """Create a user-item matrix chunk by counting occurrences of user-item combinations. 342 343 Args: 344 chunk: a dataframe chunk containing the user-item events. 345 item_key: the key of the item to use for counting occurrences. 346 rating_column: the name of the rating column to store the user-item counter in. 347 348 Returns: 349 a dataframe with the columns ['user', item_key, rating_column]. 350 """ 351 # make a copy to prevent pandas slicing errors and drop any irrelevant columns 352 chunk_header = ['user_id', item_key] 353 chunk = pd.DataFrame(chunk[chunk_header]) 354 355 chunk[rating_column] = np.ones(len(chunk)) 356 chunk = chunk.groupby(chunk_header, as_index=False).count() 357 return chunk
40@dataclass 41class MatrixProcessorConfig: 42 """Matrix Processor Configuration. 43 44 event_table_name: the name of the event table to use. 45 item_key: the item key name to create a user-item matrix for. 46 rating_column: the name of the rating column to use in the user-item matrix. 47 """ 48 49 event_table_name: str 50 item_key: str 51 rating_column: str
Matrix Processor Configuration.
event_table_name: the name of the event table to use. item_key: the item key name to create a user-item matrix for. rating_column: the name of the rating column to use in the user-item matrix.
54class DatasetMatrixProcessor: 55 """Dataset Matrix Processor. 56 57 The intended use of this class is to utilize an existing dataset that has event tables present, 58 in order to generate a new user-item matrix. The processor does the following steps in order: 59 60 1) create a temporary directory to store user-item chunks. 61 2) process the event table by creating and saving user-item chunks. 62 3) process the matrix by merging chunks. 63 4) save the matrix in the dataset directory. 64 5) update the dataset configuration file with the new user-item matrix. 65 6) remove the temporary directory with. 66 67 Public methods: 68 69 run 70 """ 71 72 def __init__(self, dataset: Dataset, verbose=True): 73 """Construct the dataset matrix processor. 74 75 Args: 76 dataset: the dataset to create a new user-item matrix for. 77 verbose: whether the processor should give verbose output. 78 """ 79 self.dataset = dataset 80 self.verbose = verbose 81 print_event = lambda _, args: get_io_event_print_switch()[args.event_id](args) 82 83 self.event_dispatcher = EventDispatcher() 84 for event_id in get_io_events(): 85 self.event_dispatcher.add_listener(event_id, None, (print_event, None)) 86 87 def run(self, 88 processor_config: MatrixProcessorConfig, 89 *, 90 chunk_size: int=DEFAULT_MATRIX_CHUNK_SIZE) -> bool: 91 """Run the processor with the specified matrix configuration. 92 93 The processor fails when the user-item matrix is already present in the 94 existing dataset configuration. 95 96 Args: 97 processor_config: the configuration to use for creating a user-item matrix. 98 chunk_size: the size of the chunks to use during processing. 99 100 Raises: 101 KeyError: when the event table does not exist in the dataset. 102 IndexError: when the item key name is not present in the event table. 103 104 Returns: 105 whether the processing of the user-item matrix succeeded. 106 """ 107 if not processor_config.event_table_name in self.dataset.config.events: 108 raise KeyError('Event table does not exist: ' + processor_config.event_table_name) 109 110 event_table = self.dataset.config.events[processor_config.event_table_name] 111 item_key = processor_config.item_key 112 if item_key not in event_table.primary_key: 113 raise IndexError('Event table does not have the requested item key') 114 115 matrix_name = 'user-' + item_key.split('_')[0] + '-' + processor_config.rating_column 116 if self.dataset.get_matrix_config(matrix_name) is not None: 117 return False 118 119 # step 1 120 temp_dir = create_dir( 121 os.path.join(self.dataset.data_dir, TABLE_FILE_PREFIX + 'tmp'), 122 self.event_dispatcher 123 ) 124 125 if self.verbose: 126 print('Started processing matrix') 127 128 # step 2 129 num_chunks = self.process_event_table( 130 temp_dir, 131 event_table, 132 item_key, 133 processor_config.rating_column, 134 chunk_size=chunk_size 135 ) 136 137 # step 3 138 matrix_tuple = self.process_matrix_from_chunks( 139 temp_dir, 140 num_chunks, 141 item_key, 142 processor_config.rating_column 143 ) 144 145 # step 4 146 dataset_matrix_config = self.save_matrix( 147 matrix_name, 148 matrix_tuple, 149 item_key, 150 processor_config.rating_column 151 ) 152 153 # step 5 154 self.dataset.config.matrices[matrix_name] = dataset_matrix_config 155 save_yml( 156 os.path.join(self.dataset.data_dir, DATASET_CONFIG_FILE), 157 self.dataset.config.to_yml_format() 158 ) 159 160 if self.verbose: 161 print('Finished processing matrix') 162 163 # step 6 164 delete_dir(temp_dir, self.event_dispatcher) 165 return True 166 167 def process_event_table( 168 self, 169 output_dir: str, 170 event_table: DatasetTableConfig, 171 item_key: str, 172 rating_column: str, 173 *, 174 chunk_size: int=DEFAULT_MATRIX_CHUNK_SIZE) -> int: 175 """Process the event table in chunks. 176 177 Args: 178 output_dir: the output directory to store the chunks. 179 event_table: the event table to process into chunks. 180 item_key: the item key name to create a user-item chunk for. 181 rating_column: the name of the rating column to use in the user-item chunk. 182 chunk_size: the size of the chunks to use during processing. 183 184 Returns: 185 the number of chunks that are generated. 186 """ 187 if self.verbose: 188 print('Started processing event table') 189 190 num_chunks = 0 191 start_row = 0 192 event_table_it = event_table.read_table(self.dataset.data_dir, chunk_size=chunk_size) 193 for i, chunk in enumerate(event_table_it): 194 end_row = int(min(start_row + chunk_size, event_table.num_records)) 195 percent = float(start_row) / float(event_table.num_records) * 100.0 196 if self.verbose: 197 print('Processing rows', start_row, 'to', end_row, 198 'of', event_table.num_records, '=>', str(percent) + '%') 199 200 chunk = create_matrix_chunk(chunk, item_key, rating_column) 201 # TODO filter chunk based on user/item columns 202 chunk.to_csv(os.path.join(output_dir, 'chunk_' + str(i) + '.tsv'), 203 sep='\t', header=True, index=False) 204 205 num_chunks += 1 206 start_row += chunk_size 207 208 if self.verbose: 209 print('Finished processing event table') 210 211 return num_chunks 212 213 def process_matrix_from_chunks( 214 self, 215 output_dir: str, 216 num_chunks: int, 217 item_key: str, 218 rating_column: str) -> Tuple[pd.DataFrame, List[int], List[int]]: 219 """Process the user-item matrix from the stored chunks. 220 221 Args: 222 output_dir: the output directory where the chunks are stored. 223 num_chunks: the number of chunks in the output directory. 224 item_key: the item key name that was used to create user-item chunks. 225 rating_column: the name of the rating column that was used to create user-item chunks. 226 227 Returns: 228 the user-item matrix and the lists of unique user/item ids. 229 """ 230 if self.verbose: 231 print('Started processing matrix chunks') 232 233 matrix = pd.DataFrame({ 234 'user_id': pd.Series(dtype='int'), 235 item_key: pd.Series(dtype='int'), 236 rating_column: pd.Series(dtype='float') 237 }) 238 239 for i in range(num_chunks): 240 if self.verbose: 241 print('Processing matrix chunk', i + 1, 'of', num_chunks) 242 243 chunk = pd.read_table(os.path.join(output_dir, 'chunk_' + str(i) + '.tsv')) 244 matrix = pd.concat([matrix, chunk], ignore_index=True) 245 matrix = matrix.groupby(['user_id', item_key], as_index=False).sum() 246 247 unique_users = matrix['user_id'].unique() 248 # users from 0...num_users 249 matrix = pd.merge( 250 matrix, 251 pd.DataFrame(list(enumerate(unique_users)), columns=['user', 'user_id']), 252 how='left', on='user_id' 253 ) 254 matrix.drop('user_id', axis=1, inplace=True) 255 256 unique_items = matrix[item_key].unique() 257 # items from 0...num_items 258 matrix = pd.merge( 259 matrix, 260 pd.DataFrame(list(enumerate(unique_items)), columns=['item', item_key]), 261 how='left', on=item_key 262 ) 263 matrix.drop(item_key, axis=1, inplace=True) 264 265 if self.verbose: 266 print('Finished processing matrix chunks') 267 268 return matrix[['user', 'item', rating_column]], list(unique_users), list(unique_items) 269 270 def save_matrix( 271 self, 272 matrix_name: str, 273 matrix_tuple: Tuple[pd.DataFrame, List[int], List[int]], 274 item_key: str, 275 rating_column: str) -> DatasetMatrixConfig: 276 """Save the matrix in the dataset directory. 277 278 Args: 279 matrix_name: the name that is used to save the matrix, user and item lists. 280 matrix_tuple: the user-item matrix and the lists of unique user/item ids. 281 item_key: the item key name that was used to create the user-item matrix. 282 rating_column: name of the rating column that was used to create the user-item matrix. 283 284 Returns: 285 the dataset matrix configuration. 286 """ 287 if self.verbose: 288 print('Started saving matrix') 289 290 dataset_matrix_name = TABLE_FILE_PREFIX + self.dataset.get_name() + '_' + matrix_name 291 matrix, users, items = matrix_tuple 292 293 # create the user indices config and save the array 294 user_index_config = DatasetIndexConfig( 295 dataset_matrix_name + '_user_indices.hdf5', 296 'user_id', 297 len(users) 298 ) 299 user_index_config.save_indices(self.dataset.data_dir, users) 300 301 # create the item indices config and save the array 302 item_index_config = DatasetIndexConfig( 303 dataset_matrix_name + '_item_indices.hdf5', 304 item_key, 305 len(items) 306 ) 307 item_index_config.save_indices(self.dataset.data_dir, items) 308 309 # create the sample matrix table config and save the table 310 matrix_table_config = create_dataset_table_config( 311 dataset_matrix_name + '_matrix.tsv.bz2', 312 ['user_id', item_key], 313 ['matrix_' + rating_column], 314 compression='bz2', 315 foreign_keys=['user_id', item_key], 316 num_records=len(matrix) 317 ) 318 matrix_table_config.save_table(matrix, self.dataset.data_dir) 319 320 rating_min = float(matrix[rating_column].min()) 321 rating_max = float(matrix[rating_column].max()) 322 rating_type = DATASET_RATINGS_IMPLICIT \ 323 if rating_max > RATING_TYPE_THRESHOLD else DATASET_RATINGS_EXPLICIT 324 325 if self.verbose: 326 print('Finished saving matrix') 327 328 return DatasetMatrixConfig( 329 matrix_table_config, 330 RatingMatrixConfig( 331 rating_min, 332 rating_max, 333 rating_type 334 ), 335 user_index_config, 336 item_index_config 337 )
Dataset Matrix Processor.
The intended use of this class is to utilize an existing dataset that has event tables present, in order to generate a new user-item matrix. The processor does the following steps in order:
1) create a temporary directory to store user-item chunks. 2) process the event table by creating and saving user-item chunks. 3) process the matrix by merging chunks. 4) save the matrix in the dataset directory. 5) update the dataset configuration file with the new user-item matrix. 6) remove the temporary directory with.
Public methods:
run
72 def __init__(self, dataset: Dataset, verbose=True): 73 """Construct the dataset matrix processor. 74 75 Args: 76 dataset: the dataset to create a new user-item matrix for. 77 verbose: whether the processor should give verbose output. 78 """ 79 self.dataset = dataset 80 self.verbose = verbose 81 print_event = lambda _, args: get_io_event_print_switch()[args.event_id](args) 82 83 self.event_dispatcher = EventDispatcher() 84 for event_id in get_io_events(): 85 self.event_dispatcher.add_listener(event_id, None, (print_event, None))
Construct the dataset matrix processor.
Args: dataset: the dataset to create a new user-item matrix for. verbose: whether the processor should give verbose output.
87 def run(self, 88 processor_config: MatrixProcessorConfig, 89 *, 90 chunk_size: int=DEFAULT_MATRIX_CHUNK_SIZE) -> bool: 91 """Run the processor with the specified matrix configuration. 92 93 The processor fails when the user-item matrix is already present in the 94 existing dataset configuration. 95 96 Args: 97 processor_config: the configuration to use for creating a user-item matrix. 98 chunk_size: the size of the chunks to use during processing. 99 100 Raises: 101 KeyError: when the event table does not exist in the dataset. 102 IndexError: when the item key name is not present in the event table. 103 104 Returns: 105 whether the processing of the user-item matrix succeeded. 106 """ 107 if not processor_config.event_table_name in self.dataset.config.events: 108 raise KeyError('Event table does not exist: ' + processor_config.event_table_name) 109 110 event_table = self.dataset.config.events[processor_config.event_table_name] 111 item_key = processor_config.item_key 112 if item_key not in event_table.primary_key: 113 raise IndexError('Event table does not have the requested item key') 114 115 matrix_name = 'user-' + item_key.split('_')[0] + '-' + processor_config.rating_column 116 if self.dataset.get_matrix_config(matrix_name) is not None: 117 return False 118 119 # step 1 120 temp_dir = create_dir( 121 os.path.join(self.dataset.data_dir, TABLE_FILE_PREFIX + 'tmp'), 122 self.event_dispatcher 123 ) 124 125 if self.verbose: 126 print('Started processing matrix') 127 128 # step 2 129 num_chunks = self.process_event_table( 130 temp_dir, 131 event_table, 132 item_key, 133 processor_config.rating_column, 134 chunk_size=chunk_size 135 ) 136 137 # step 3 138 matrix_tuple = self.process_matrix_from_chunks( 139 temp_dir, 140 num_chunks, 141 item_key, 142 processor_config.rating_column 143 ) 144 145 # step 4 146 dataset_matrix_config = self.save_matrix( 147 matrix_name, 148 matrix_tuple, 149 item_key, 150 processor_config.rating_column 151 ) 152 153 # step 5 154 self.dataset.config.matrices[matrix_name] = dataset_matrix_config 155 save_yml( 156 os.path.join(self.dataset.data_dir, DATASET_CONFIG_FILE), 157 self.dataset.config.to_yml_format() 158 ) 159 160 if self.verbose: 161 print('Finished processing matrix') 162 163 # step 6 164 delete_dir(temp_dir, self.event_dispatcher) 165 return True
Run the processor with the specified matrix configuration.
The processor fails when the user-item matrix is already present in the existing dataset configuration.
Args: processor_config: the configuration to use for creating a user-item matrix. chunk_size: the size of the chunks to use during processing.
Raises: KeyError: when the event table does not exist in the dataset. IndexError: when the item key name is not present in the event table.
Returns: whether the processing of the user-item matrix succeeded.
167 def process_event_table( 168 self, 169 output_dir: str, 170 event_table: DatasetTableConfig, 171 item_key: str, 172 rating_column: str, 173 *, 174 chunk_size: int=DEFAULT_MATRIX_CHUNK_SIZE) -> int: 175 """Process the event table in chunks. 176 177 Args: 178 output_dir: the output directory to store the chunks. 179 event_table: the event table to process into chunks. 180 item_key: the item key name to create a user-item chunk for. 181 rating_column: the name of the rating column to use in the user-item chunk. 182 chunk_size: the size of the chunks to use during processing. 183 184 Returns: 185 the number of chunks that are generated. 186 """ 187 if self.verbose: 188 print('Started processing event table') 189 190 num_chunks = 0 191 start_row = 0 192 event_table_it = event_table.read_table(self.dataset.data_dir, chunk_size=chunk_size) 193 for i, chunk in enumerate(event_table_it): 194 end_row = int(min(start_row + chunk_size, event_table.num_records)) 195 percent = float(start_row) / float(event_table.num_records) * 100.0 196 if self.verbose: 197 print('Processing rows', start_row, 'to', end_row, 198 'of', event_table.num_records, '=>', str(percent) + '%') 199 200 chunk = create_matrix_chunk(chunk, item_key, rating_column) 201 # TODO filter chunk based on user/item columns 202 chunk.to_csv(os.path.join(output_dir, 'chunk_' + str(i) + '.tsv'), 203 sep='\t', header=True, index=False) 204 205 num_chunks += 1 206 start_row += chunk_size 207 208 if self.verbose: 209 print('Finished processing event table') 210 211 return num_chunks
Process the event table in chunks.
Args: output_dir: the output directory to store the chunks. event_table: the event table to process into chunks. item_key: the item key name to create a user-item chunk for. rating_column: the name of the rating column to use in the user-item chunk. chunk_size: the size of the chunks to use during processing.
Returns: the number of chunks that are generated.
213 def process_matrix_from_chunks( 214 self, 215 output_dir: str, 216 num_chunks: int, 217 item_key: str, 218 rating_column: str) -> Tuple[pd.DataFrame, List[int], List[int]]: 219 """Process the user-item matrix from the stored chunks. 220 221 Args: 222 output_dir: the output directory where the chunks are stored. 223 num_chunks: the number of chunks in the output directory. 224 item_key: the item key name that was used to create user-item chunks. 225 rating_column: the name of the rating column that was used to create user-item chunks. 226 227 Returns: 228 the user-item matrix and the lists of unique user/item ids. 229 """ 230 if self.verbose: 231 print('Started processing matrix chunks') 232 233 matrix = pd.DataFrame({ 234 'user_id': pd.Series(dtype='int'), 235 item_key: pd.Series(dtype='int'), 236 rating_column: pd.Series(dtype='float') 237 }) 238 239 for i in range(num_chunks): 240 if self.verbose: 241 print('Processing matrix chunk', i + 1, 'of', num_chunks) 242 243 chunk = pd.read_table(os.path.join(output_dir, 'chunk_' + str(i) + '.tsv')) 244 matrix = pd.concat([matrix, chunk], ignore_index=True) 245 matrix = matrix.groupby(['user_id', item_key], as_index=False).sum() 246 247 unique_users = matrix['user_id'].unique() 248 # users from 0...num_users 249 matrix = pd.merge( 250 matrix, 251 pd.DataFrame(list(enumerate(unique_users)), columns=['user', 'user_id']), 252 how='left', on='user_id' 253 ) 254 matrix.drop('user_id', axis=1, inplace=True) 255 256 unique_items = matrix[item_key].unique() 257 # items from 0...num_items 258 matrix = pd.merge( 259 matrix, 260 pd.DataFrame(list(enumerate(unique_items)), columns=['item', item_key]), 261 how='left', on=item_key 262 ) 263 matrix.drop(item_key, axis=1, inplace=True) 264 265 if self.verbose: 266 print('Finished processing matrix chunks') 267 268 return matrix[['user', 'item', rating_column]], list(unique_users), list(unique_items)
Process the user-item matrix from the stored chunks.
Args: output_dir: the output directory where the chunks are stored. num_chunks: the number of chunks in the output directory. item_key: the item key name that was used to create user-item chunks. rating_column: the name of the rating column that was used to create user-item chunks.
Returns: the user-item matrix and the lists of unique user/item ids.
270 def save_matrix( 271 self, 272 matrix_name: str, 273 matrix_tuple: Tuple[pd.DataFrame, List[int], List[int]], 274 item_key: str, 275 rating_column: str) -> DatasetMatrixConfig: 276 """Save the matrix in the dataset directory. 277 278 Args: 279 matrix_name: the name that is used to save the matrix, user and item lists. 280 matrix_tuple: the user-item matrix and the lists of unique user/item ids. 281 item_key: the item key name that was used to create the user-item matrix. 282 rating_column: name of the rating column that was used to create the user-item matrix. 283 284 Returns: 285 the dataset matrix configuration. 286 """ 287 if self.verbose: 288 print('Started saving matrix') 289 290 dataset_matrix_name = TABLE_FILE_PREFIX + self.dataset.get_name() + '_' + matrix_name 291 matrix, users, items = matrix_tuple 292 293 # create the user indices config and save the array 294 user_index_config = DatasetIndexConfig( 295 dataset_matrix_name + '_user_indices.hdf5', 296 'user_id', 297 len(users) 298 ) 299 user_index_config.save_indices(self.dataset.data_dir, users) 300 301 # create the item indices config and save the array 302 item_index_config = DatasetIndexConfig( 303 dataset_matrix_name + '_item_indices.hdf5', 304 item_key, 305 len(items) 306 ) 307 item_index_config.save_indices(self.dataset.data_dir, items) 308 309 # create the sample matrix table config and save the table 310 matrix_table_config = create_dataset_table_config( 311 dataset_matrix_name + '_matrix.tsv.bz2', 312 ['user_id', item_key], 313 ['matrix_' + rating_column], 314 compression='bz2', 315 foreign_keys=['user_id', item_key], 316 num_records=len(matrix) 317 ) 318 matrix_table_config.save_table(matrix, self.dataset.data_dir) 319 320 rating_min = float(matrix[rating_column].min()) 321 rating_max = float(matrix[rating_column].max()) 322 rating_type = DATASET_RATINGS_IMPLICIT \ 323 if rating_max > RATING_TYPE_THRESHOLD else DATASET_RATINGS_EXPLICIT 324 325 if self.verbose: 326 print('Finished saving matrix') 327 328 return DatasetMatrixConfig( 329 matrix_table_config, 330 RatingMatrixConfig( 331 rating_min, 332 rating_max, 333 rating_type 334 ), 335 user_index_config, 336 item_index_config 337 )
Save the matrix in the dataset directory.
Args: matrix_name: the name that is used to save the matrix, user and item lists. matrix_tuple: the user-item matrix and the lists of unique user/item ids. item_key: the item key name that was used to create the user-item matrix. rating_column: name of the rating column that was used to create the user-item matrix.
Returns: the dataset matrix configuration.
341def create_matrix_chunk(chunk: pd.DataFrame, item_key: str, rating_column: str) -> pd.DataFrame: 342 """Create a user-item matrix chunk by counting occurrences of user-item combinations. 343 344 Args: 345 chunk: a dataframe chunk containing the user-item events. 346 item_key: the key of the item to use for counting occurrences. 347 rating_column: the name of the rating column to store the user-item counter in. 348 349 Returns: 350 a dataframe with the columns ['user', item_key, rating_column]. 351 """ 352 # make a copy to prevent pandas slicing errors and drop any irrelevant columns 353 chunk_header = ['user_id', item_key] 354 chunk = pd.DataFrame(chunk[chunk_header]) 355 356 chunk[rating_column] = np.ones(len(chunk)) 357 chunk = chunk.groupby(chunk_header, as_index=False).count() 358 return chunk
Create a user-item matrix chunk by counting occurrences of user-item combinations.
Args: chunk: a dataframe chunk containing the user-item events. item_key: the key of the item to use for counting occurrences. rating_column: the name of the rating column to store the user-item counter in.
Returns: a dataframe with the columns ['user', item_key, rating_column].