src.fairreckitlib.data.set.dataset_matrix

This module contains functionality to create matrices from dataset event tables.

Classes:

MatrixProcessorConfig: the matrix processor configuration.
DatasetMatrixProcessor: the dataset matrix processor that generates/adds user-item matrices.

Functions:

create_matrix_chunk: create a user-item matrix chunk by counting user-item occurrences.

This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)

  1"""This module contains functionality to create matrices from dataset event tables.
  2
  3Classes:
  4
  5    MatrixProcessorConfig: the matrix processor configuration.
  6    DatasetMatrixProcessor: the dataset matrix processor that generates/adds user-item matrices.
  7
  8Functions:
  9
 10    create_matrix_chunk: create a user-item matrix chunk by counting user-item occurrences.
 11
 12This program has been developed by students from the bachelor Computer Science at
 13Utrecht University within the Software Project course.
 14© Copyright Utrecht University (Department of Information and Computing Sciences)
 15"""
 16
 17from dataclasses import dataclass
 18import os
 19from typing import List, Tuple
 20
 21import numpy as np
 22import pandas as pd
 23
 24from ...core.events.event_dispatcher import EventDispatcher
 25from ...core.io.event_io import get_io_events, get_io_event_print_switch
 26from ...core.io.io_create import create_dir
 27from ...core.io.io_delete import delete_dir
 28from ...core.io.io_utility import save_yml
 29from ..ratings.convert_constants import RATING_TYPE_THRESHOLD
 30from .dataset import Dataset
 31from .dataset_constants import TABLE_FILE_PREFIX, DATASET_CONFIG_FILE
 32from .dataset_config import DATASET_RATINGS_EXPLICIT, DATASET_RATINGS_IMPLICIT, \
 33    DatasetTableConfig, DatasetMatrixConfig, DatasetIndexConfig, RatingMatrixConfig, \
 34    create_dataset_table_config
 35
 36DEFAULT_MATRIX_CHUNK_SIZE = 10E6
 37
 38
 39@dataclass
 40class MatrixProcessorConfig:
 41    """Matrix Processor Configuration.
 42
 43    event_table_name: the name of the event table to use.
 44    item_key: the item key name to create a user-item matrix for.
 45    rating_column: the name of the rating column to use in the user-item matrix.
 46    """
 47
 48    event_table_name: str
 49    item_key: str
 50    rating_column: str
 51
 52
 53class DatasetMatrixProcessor:
 54    """Dataset Matrix Processor.
 55
 56    The intended use of this class is to utilize an existing dataset that has event tables present,
 57    in order to generate a new user-item matrix. The processor does the following steps in order:
 58
 59    1) create a temporary directory to store user-item chunks.
 60    2) process the event table by creating and saving user-item chunks.
 61    3) process the matrix by merging chunks.
 62    4) save the matrix in the dataset directory.
 63    5) update the dataset configuration file with the new user-item matrix.
 64    6) remove the temporary directory with.
 65
 66    Public methods:
 67
 68    run
 69    """
 70
 71    def __init__(self, dataset: Dataset, verbose=True):
 72        """Construct the dataset matrix processor.
 73
 74        Args:
 75            dataset: the dataset to create a new user-item matrix for.
 76            verbose: whether the processor should give verbose output.
 77        """
 78        self.dataset = dataset
 79        self.verbose = verbose
 80        print_event = lambda _, args: get_io_event_print_switch()[args.event_id](args)
 81
 82        self.event_dispatcher = EventDispatcher()
 83        for event_id in get_io_events():
 84            self.event_dispatcher.add_listener(event_id, None, (print_event, None))
 85
 86    def run(self,
 87            processor_config: MatrixProcessorConfig,
 88            *,
 89            chunk_size: int=DEFAULT_MATRIX_CHUNK_SIZE) -> bool:
 90        """Run the processor with the specified matrix configuration.
 91
 92        The processor fails when the user-item matrix is already present in the
 93        existing dataset configuration.
 94
 95        Args:
 96            processor_config: the configuration to use for creating a user-item matrix.
 97            chunk_size: the size of the chunks to use during processing.
 98
 99        Raises:
100            KeyError: when the event table does not exist in the dataset.
101            IndexError: when the item key name is not present in the event table.
102
103        Returns:
104            whether the processing of the user-item matrix succeeded.
105        """
106        if not processor_config.event_table_name in self.dataset.config.events:
107            raise KeyError('Event table does not exist: ' + processor_config.event_table_name)
108
109        event_table = self.dataset.config.events[processor_config.event_table_name]
110        item_key = processor_config.item_key
111        if item_key not in event_table.primary_key:
112            raise IndexError('Event table does not have the requested item key')
113
114        matrix_name = 'user-' + item_key.split('_')[0] + '-' + processor_config.rating_column
115        if self.dataset.get_matrix_config(matrix_name) is not None:
116            return False
117
118        # step 1
119        temp_dir = create_dir(
120            os.path.join(self.dataset.data_dir, TABLE_FILE_PREFIX + 'tmp'),
121            self.event_dispatcher
122        )
123
124        if self.verbose:
125            print('Started processing matrix')
126
127        # step 2
128        num_chunks = self.process_event_table(
129            temp_dir,
130            event_table,
131            item_key,
132            processor_config.rating_column,
133            chunk_size=chunk_size
134        )
135
136        # step 3
137        matrix_tuple = self.process_matrix_from_chunks(
138            temp_dir,
139            num_chunks,
140            item_key,
141            processor_config.rating_column
142        )
143
144        # step 4
145        dataset_matrix_config = self.save_matrix(
146            matrix_name,
147            matrix_tuple,
148            item_key,
149            processor_config.rating_column
150        )
151
152        # step 5
153        self.dataset.config.matrices[matrix_name] = dataset_matrix_config
154        save_yml(
155            os.path.join(self.dataset.data_dir, DATASET_CONFIG_FILE),
156            self.dataset.config.to_yml_format()
157        )
158
159        if self.verbose:
160            print('Finished processing matrix')
161
162        # step 6
163        delete_dir(temp_dir, self.event_dispatcher)
164        return True
165
166    def process_event_table(
167            self,
168            output_dir: str,
169            event_table: DatasetTableConfig,
170            item_key: str,
171            rating_column: str,
172            *,
173            chunk_size: int=DEFAULT_MATRIX_CHUNK_SIZE) -> int:
174        """Process the event table in chunks.
175
176        Args:
177            output_dir: the output directory to store the chunks.
178            event_table: the event table to process into chunks.
179            item_key: the item key name to create a user-item chunk for.
180            rating_column: the name of the rating column to use in the user-item chunk.
181            chunk_size: the size of the chunks to use during processing.
182
183        Returns:
184            the number of chunks that are generated.
185        """
186        if self.verbose:
187            print('Started processing event table')
188
189        num_chunks = 0
190        start_row = 0
191        event_table_it = event_table.read_table(self.dataset.data_dir, chunk_size=chunk_size)
192        for i, chunk in enumerate(event_table_it):
193            end_row = int(min(start_row + chunk_size, event_table.num_records))
194            percent = float(start_row) / float(event_table.num_records) * 100.0
195            if self.verbose:
196                print('Processing rows', start_row, 'to', end_row,
197                      'of', event_table.num_records, '=>', str(percent) + '%')
198
199            chunk = create_matrix_chunk(chunk, item_key, rating_column)
200            # TODO filter chunk based on user/item columns
201            chunk.to_csv(os.path.join(output_dir, 'chunk_' + str(i) + '.tsv'),
202                         sep='\t', header=True, index=False)
203
204            num_chunks += 1
205            start_row += chunk_size
206
207        if self.verbose:
208            print('Finished processing event table')
209
210        return num_chunks
211
212    def process_matrix_from_chunks(
213            self,
214            output_dir: str,
215            num_chunks: int,
216            item_key: str,
217            rating_column: str) -> Tuple[pd.DataFrame, List[int], List[int]]:
218        """Process the user-item matrix from the stored chunks.
219
220        Args:
221            output_dir: the output directory where the chunks are stored.
222            num_chunks: the number of chunks in the output directory.
223            item_key: the item key name that was used to create user-item chunks.
224            rating_column: the name of the rating column that was used to create user-item chunks.
225
226        Returns:
227            the user-item matrix and the lists of unique user/item ids.
228        """
229        if self.verbose:
230            print('Started processing matrix chunks')
231
232        matrix = pd.DataFrame({
233            'user_id': pd.Series(dtype='int'),
234            item_key: pd.Series(dtype='int'),
235            rating_column: pd.Series(dtype='float')
236        })
237
238        for i in range(num_chunks):
239            if self.verbose:
240                print('Processing matrix chunk', i + 1, 'of', num_chunks)
241
242            chunk = pd.read_table(os.path.join(output_dir, 'chunk_' + str(i) + '.tsv'))
243            matrix = pd.concat([matrix, chunk], ignore_index=True)
244            matrix = matrix.groupby(['user_id', item_key], as_index=False).sum()
245
246        unique_users = matrix['user_id'].unique()
247        # users from 0...num_users
248        matrix = pd.merge(
249            matrix,
250            pd.DataFrame(list(enumerate(unique_users)), columns=['user', 'user_id']),
251            how='left', on='user_id'
252        )
253        matrix.drop('user_id', axis=1, inplace=True)
254
255        unique_items = matrix[item_key].unique()
256        # items from 0...num_items
257        matrix = pd.merge(
258            matrix,
259            pd.DataFrame(list(enumerate(unique_items)), columns=['item', item_key]),
260            how='left', on=item_key
261        )
262        matrix.drop(item_key, axis=1, inplace=True)
263
264        if self.verbose:
265            print('Finished processing matrix chunks')
266
267        return matrix[['user', 'item', rating_column]], list(unique_users), list(unique_items)
268
269    def save_matrix(
270            self,
271            matrix_name: str,
272            matrix_tuple: Tuple[pd.DataFrame, List[int], List[int]],
273            item_key: str,
274            rating_column: str) -> DatasetMatrixConfig:
275        """Save the matrix in the dataset directory.
276
277        Args:
278            matrix_name: the name that is used to save the matrix, user and item lists.
279            matrix_tuple: the user-item matrix and the lists of unique user/item ids.
280            item_key: the item key name that was used to create the user-item matrix.
281            rating_column: name of the rating column that was used to create the user-item matrix.
282
283        Returns:
284            the dataset matrix configuration.
285        """
286        if self.verbose:
287            print('Started saving matrix')
288
289        dataset_matrix_name = TABLE_FILE_PREFIX + self.dataset.get_name() + '_' + matrix_name
290        matrix, users, items = matrix_tuple
291
292        # create the user indices config and save the array
293        user_index_config = DatasetIndexConfig(
294            dataset_matrix_name + '_user_indices.hdf5',
295            'user_id',
296            len(users)
297        )
298        user_index_config.save_indices(self.dataset.data_dir, users)
299
300        # create the item indices config and save the array
301        item_index_config = DatasetIndexConfig(
302            dataset_matrix_name + '_item_indices.hdf5',
303            item_key,
304            len(items)
305        )
306        item_index_config.save_indices(self.dataset.data_dir, items)
307
308        # create the sample matrix table config and save the table
309        matrix_table_config = create_dataset_table_config(
310            dataset_matrix_name + '_matrix.tsv.bz2',
311            ['user_id', item_key],
312            ['matrix_' + rating_column],
313            compression='bz2',
314            foreign_keys=['user_id', item_key],
315            num_records=len(matrix)
316        )
317        matrix_table_config.save_table(matrix, self.dataset.data_dir)
318
319        rating_min = float(matrix[rating_column].min())
320        rating_max = float(matrix[rating_column].max())
321        rating_type = DATASET_RATINGS_IMPLICIT \
322            if rating_max > RATING_TYPE_THRESHOLD else DATASET_RATINGS_EXPLICIT
323
324        if self.verbose:
325            print('Finished saving matrix')
326
327        return DatasetMatrixConfig(
328            matrix_table_config,
329            RatingMatrixConfig(
330                rating_min,
331                rating_max,
332                rating_type
333            ),
334            user_index_config,
335            item_index_config
336        )
337
338
339
340def create_matrix_chunk(chunk: pd.DataFrame, item_key: str, rating_column: str) -> pd.DataFrame:
341    """Create a user-item matrix chunk by counting occurrences of user-item combinations.
342
343    Args:
344        chunk: a dataframe chunk containing the user-item events.
345        item_key: the key of the item to use for counting occurrences.
346        rating_column: the name of the rating column to store the user-item counter in.
347
348    Returns:
349        a dataframe with the columns ['user', item_key, rating_column].
350    """
351    # make a copy to prevent pandas slicing errors and drop any irrelevant columns
352    chunk_header = ['user_id', item_key]
353    chunk = pd.DataFrame(chunk[chunk_header])
354
355    chunk[rating_column] = np.ones(len(chunk))
356    chunk = chunk.groupby(chunk_header, as_index=False).count()
357    return chunk
@dataclass
class MatrixProcessorConfig:
40@dataclass
41class MatrixProcessorConfig:
42    """Matrix Processor Configuration.
43
44    event_table_name: the name of the event table to use.
45    item_key: the item key name to create a user-item matrix for.
46    rating_column: the name of the rating column to use in the user-item matrix.
47    """
48
49    event_table_name: str
50    item_key: str
51    rating_column: str

Matrix Processor Configuration.

event_table_name: the name of the event table to use. item_key: the item key name to create a user-item matrix for. rating_column: the name of the rating column to use in the user-item matrix.

MatrixProcessorConfig(event_table_name: str, item_key: str, rating_column: str)
class DatasetMatrixProcessor:
 54class DatasetMatrixProcessor:
 55    """Dataset Matrix Processor.
 56
 57    The intended use of this class is to utilize an existing dataset that has event tables present,
 58    in order to generate a new user-item matrix. The processor does the following steps in order:
 59
 60    1) create a temporary directory to store user-item chunks.
 61    2) process the event table by creating and saving user-item chunks.
 62    3) process the matrix by merging chunks.
 63    4) save the matrix in the dataset directory.
 64    5) update the dataset configuration file with the new user-item matrix.
 65    6) remove the temporary directory with.
 66
 67    Public methods:
 68
 69    run
 70    """
 71
 72    def __init__(self, dataset: Dataset, verbose=True):
 73        """Construct the dataset matrix processor.
 74
 75        Args:
 76            dataset: the dataset to create a new user-item matrix for.
 77            verbose: whether the processor should give verbose output.
 78        """
 79        self.dataset = dataset
 80        self.verbose = verbose
 81        print_event = lambda _, args: get_io_event_print_switch()[args.event_id](args)
 82
 83        self.event_dispatcher = EventDispatcher()
 84        for event_id in get_io_events():
 85            self.event_dispatcher.add_listener(event_id, None, (print_event, None))
 86
 87    def run(self,
 88            processor_config: MatrixProcessorConfig,
 89            *,
 90            chunk_size: int=DEFAULT_MATRIX_CHUNK_SIZE) -> bool:
 91        """Run the processor with the specified matrix configuration.
 92
 93        The processor fails when the user-item matrix is already present in the
 94        existing dataset configuration.
 95
 96        Args:
 97            processor_config: the configuration to use for creating a user-item matrix.
 98            chunk_size: the size of the chunks to use during processing.
 99
100        Raises:
101            KeyError: when the event table does not exist in the dataset.
102            IndexError: when the item key name is not present in the event table.
103
104        Returns:
105            whether the processing of the user-item matrix succeeded.
106        """
107        if not processor_config.event_table_name in self.dataset.config.events:
108            raise KeyError('Event table does not exist: ' + processor_config.event_table_name)
109
110        event_table = self.dataset.config.events[processor_config.event_table_name]
111        item_key = processor_config.item_key
112        if item_key not in event_table.primary_key:
113            raise IndexError('Event table does not have the requested item key')
114
115        matrix_name = 'user-' + item_key.split('_')[0] + '-' + processor_config.rating_column
116        if self.dataset.get_matrix_config(matrix_name) is not None:
117            return False
118
119        # step 1
120        temp_dir = create_dir(
121            os.path.join(self.dataset.data_dir, TABLE_FILE_PREFIX + 'tmp'),
122            self.event_dispatcher
123        )
124
125        if self.verbose:
126            print('Started processing matrix')
127
128        # step 2
129        num_chunks = self.process_event_table(
130            temp_dir,
131            event_table,
132            item_key,
133            processor_config.rating_column,
134            chunk_size=chunk_size
135        )
136
137        # step 3
138        matrix_tuple = self.process_matrix_from_chunks(
139            temp_dir,
140            num_chunks,
141            item_key,
142            processor_config.rating_column
143        )
144
145        # step 4
146        dataset_matrix_config = self.save_matrix(
147            matrix_name,
148            matrix_tuple,
149            item_key,
150            processor_config.rating_column
151        )
152
153        # step 5
154        self.dataset.config.matrices[matrix_name] = dataset_matrix_config
155        save_yml(
156            os.path.join(self.dataset.data_dir, DATASET_CONFIG_FILE),
157            self.dataset.config.to_yml_format()
158        )
159
160        if self.verbose:
161            print('Finished processing matrix')
162
163        # step 6
164        delete_dir(temp_dir, self.event_dispatcher)
165        return True
166
167    def process_event_table(
168            self,
169            output_dir: str,
170            event_table: DatasetTableConfig,
171            item_key: str,
172            rating_column: str,
173            *,
174            chunk_size: int=DEFAULT_MATRIX_CHUNK_SIZE) -> int:
175        """Process the event table in chunks.
176
177        Args:
178            output_dir: the output directory to store the chunks.
179            event_table: the event table to process into chunks.
180            item_key: the item key name to create a user-item chunk for.
181            rating_column: the name of the rating column to use in the user-item chunk.
182            chunk_size: the size of the chunks to use during processing.
183
184        Returns:
185            the number of chunks that are generated.
186        """
187        if self.verbose:
188            print('Started processing event table')
189
190        num_chunks = 0
191        start_row = 0
192        event_table_it = event_table.read_table(self.dataset.data_dir, chunk_size=chunk_size)
193        for i, chunk in enumerate(event_table_it):
194            end_row = int(min(start_row + chunk_size, event_table.num_records))
195            percent = float(start_row) / float(event_table.num_records) * 100.0
196            if self.verbose:
197                print('Processing rows', start_row, 'to', end_row,
198                      'of', event_table.num_records, '=>', str(percent) + '%')
199
200            chunk = create_matrix_chunk(chunk, item_key, rating_column)
201            # TODO filter chunk based on user/item columns
202            chunk.to_csv(os.path.join(output_dir, 'chunk_' + str(i) + '.tsv'),
203                         sep='\t', header=True, index=False)
204
205            num_chunks += 1
206            start_row += chunk_size
207
208        if self.verbose:
209            print('Finished processing event table')
210
211        return num_chunks
212
213    def process_matrix_from_chunks(
214            self,
215            output_dir: str,
216            num_chunks: int,
217            item_key: str,
218            rating_column: str) -> Tuple[pd.DataFrame, List[int], List[int]]:
219        """Process the user-item matrix from the stored chunks.
220
221        Args:
222            output_dir: the output directory where the chunks are stored.
223            num_chunks: the number of chunks in the output directory.
224            item_key: the item key name that was used to create user-item chunks.
225            rating_column: the name of the rating column that was used to create user-item chunks.
226
227        Returns:
228            the user-item matrix and the lists of unique user/item ids.
229        """
230        if self.verbose:
231            print('Started processing matrix chunks')
232
233        matrix = pd.DataFrame({
234            'user_id': pd.Series(dtype='int'),
235            item_key: pd.Series(dtype='int'),
236            rating_column: pd.Series(dtype='float')
237        })
238
239        for i in range(num_chunks):
240            if self.verbose:
241                print('Processing matrix chunk', i + 1, 'of', num_chunks)
242
243            chunk = pd.read_table(os.path.join(output_dir, 'chunk_' + str(i) + '.tsv'))
244            matrix = pd.concat([matrix, chunk], ignore_index=True)
245            matrix = matrix.groupby(['user_id', item_key], as_index=False).sum()
246
247        unique_users = matrix['user_id'].unique()
248        # users from 0...num_users
249        matrix = pd.merge(
250            matrix,
251            pd.DataFrame(list(enumerate(unique_users)), columns=['user', 'user_id']),
252            how='left', on='user_id'
253        )
254        matrix.drop('user_id', axis=1, inplace=True)
255
256        unique_items = matrix[item_key].unique()
257        # items from 0...num_items
258        matrix = pd.merge(
259            matrix,
260            pd.DataFrame(list(enumerate(unique_items)), columns=['item', item_key]),
261            how='left', on=item_key
262        )
263        matrix.drop(item_key, axis=1, inplace=True)
264
265        if self.verbose:
266            print('Finished processing matrix chunks')
267
268        return matrix[['user', 'item', rating_column]], list(unique_users), list(unique_items)
269
270    def save_matrix(
271            self,
272            matrix_name: str,
273            matrix_tuple: Tuple[pd.DataFrame, List[int], List[int]],
274            item_key: str,
275            rating_column: str) -> DatasetMatrixConfig:
276        """Save the matrix in the dataset directory.
277
278        Args:
279            matrix_name: the name that is used to save the matrix, user and item lists.
280            matrix_tuple: the user-item matrix and the lists of unique user/item ids.
281            item_key: the item key name that was used to create the user-item matrix.
282            rating_column: name of the rating column that was used to create the user-item matrix.
283
284        Returns:
285            the dataset matrix configuration.
286        """
287        if self.verbose:
288            print('Started saving matrix')
289
290        dataset_matrix_name = TABLE_FILE_PREFIX + self.dataset.get_name() + '_' + matrix_name
291        matrix, users, items = matrix_tuple
292
293        # create the user indices config and save the array
294        user_index_config = DatasetIndexConfig(
295            dataset_matrix_name + '_user_indices.hdf5',
296            'user_id',
297            len(users)
298        )
299        user_index_config.save_indices(self.dataset.data_dir, users)
300
301        # create the item indices config and save the array
302        item_index_config = DatasetIndexConfig(
303            dataset_matrix_name + '_item_indices.hdf5',
304            item_key,
305            len(items)
306        )
307        item_index_config.save_indices(self.dataset.data_dir, items)
308
309        # create the sample matrix table config and save the table
310        matrix_table_config = create_dataset_table_config(
311            dataset_matrix_name + '_matrix.tsv.bz2',
312            ['user_id', item_key],
313            ['matrix_' + rating_column],
314            compression='bz2',
315            foreign_keys=['user_id', item_key],
316            num_records=len(matrix)
317        )
318        matrix_table_config.save_table(matrix, self.dataset.data_dir)
319
320        rating_min = float(matrix[rating_column].min())
321        rating_max = float(matrix[rating_column].max())
322        rating_type = DATASET_RATINGS_IMPLICIT \
323            if rating_max > RATING_TYPE_THRESHOLD else DATASET_RATINGS_EXPLICIT
324
325        if self.verbose:
326            print('Finished saving matrix')
327
328        return DatasetMatrixConfig(
329            matrix_table_config,
330            RatingMatrixConfig(
331                rating_min,
332                rating_max,
333                rating_type
334            ),
335            user_index_config,
336            item_index_config
337        )

Dataset Matrix Processor.

The intended use of this class is to utilize an existing dataset that has event tables present, in order to generate a new user-item matrix. The processor does the following steps in order:

1) create a temporary directory to store user-item chunks. 2) process the event table by creating and saving user-item chunks. 3) process the matrix by merging chunks. 4) save the matrix in the dataset directory. 5) update the dataset configuration file with the new user-item matrix. 6) remove the temporary directory with.

Public methods:

run

DatasetMatrixProcessor(dataset: src.fairreckitlib.data.set.dataset.Dataset, verbose=True)
72    def __init__(self, dataset: Dataset, verbose=True):
73        """Construct the dataset matrix processor.
74
75        Args:
76            dataset: the dataset to create a new user-item matrix for.
77            verbose: whether the processor should give verbose output.
78        """
79        self.dataset = dataset
80        self.verbose = verbose
81        print_event = lambda _, args: get_io_event_print_switch()[args.event_id](args)
82
83        self.event_dispatcher = EventDispatcher()
84        for event_id in get_io_events():
85            self.event_dispatcher.add_listener(event_id, None, (print_event, None))

Construct the dataset matrix processor.

Args: dataset: the dataset to create a new user-item matrix for. verbose: whether the processor should give verbose output.

def run( self, processor_config: src.fairreckitlib.data.set.dataset_matrix.MatrixProcessorConfig, *, chunk_size: int = 10000000.0) -> bool:
 87    def run(self,
 88            processor_config: MatrixProcessorConfig,
 89            *,
 90            chunk_size: int=DEFAULT_MATRIX_CHUNK_SIZE) -> bool:
 91        """Run the processor with the specified matrix configuration.
 92
 93        The processor fails when the user-item matrix is already present in the
 94        existing dataset configuration.
 95
 96        Args:
 97            processor_config: the configuration to use for creating a user-item matrix.
 98            chunk_size: the size of the chunks to use during processing.
 99
100        Raises:
101            KeyError: when the event table does not exist in the dataset.
102            IndexError: when the item key name is not present in the event table.
103
104        Returns:
105            whether the processing of the user-item matrix succeeded.
106        """
107        if not processor_config.event_table_name in self.dataset.config.events:
108            raise KeyError('Event table does not exist: ' + processor_config.event_table_name)
109
110        event_table = self.dataset.config.events[processor_config.event_table_name]
111        item_key = processor_config.item_key
112        if item_key not in event_table.primary_key:
113            raise IndexError('Event table does not have the requested item key')
114
115        matrix_name = 'user-' + item_key.split('_')[0] + '-' + processor_config.rating_column
116        if self.dataset.get_matrix_config(matrix_name) is not None:
117            return False
118
119        # step 1
120        temp_dir = create_dir(
121            os.path.join(self.dataset.data_dir, TABLE_FILE_PREFIX + 'tmp'),
122            self.event_dispatcher
123        )
124
125        if self.verbose:
126            print('Started processing matrix')
127
128        # step 2
129        num_chunks = self.process_event_table(
130            temp_dir,
131            event_table,
132            item_key,
133            processor_config.rating_column,
134            chunk_size=chunk_size
135        )
136
137        # step 3
138        matrix_tuple = self.process_matrix_from_chunks(
139            temp_dir,
140            num_chunks,
141            item_key,
142            processor_config.rating_column
143        )
144
145        # step 4
146        dataset_matrix_config = self.save_matrix(
147            matrix_name,
148            matrix_tuple,
149            item_key,
150            processor_config.rating_column
151        )
152
153        # step 5
154        self.dataset.config.matrices[matrix_name] = dataset_matrix_config
155        save_yml(
156            os.path.join(self.dataset.data_dir, DATASET_CONFIG_FILE),
157            self.dataset.config.to_yml_format()
158        )
159
160        if self.verbose:
161            print('Finished processing matrix')
162
163        # step 6
164        delete_dir(temp_dir, self.event_dispatcher)
165        return True

Run the processor with the specified matrix configuration.

The processor fails when the user-item matrix is already present in the existing dataset configuration.

Args: processor_config: the configuration to use for creating a user-item matrix. chunk_size: the size of the chunks to use during processing.

Raises: KeyError: when the event table does not exist in the dataset. IndexError: when the item key name is not present in the event table.

Returns: whether the processing of the user-item matrix succeeded.

def process_event_table( self, output_dir: str, event_table: src.fairreckitlib.data.set.dataset_config.DatasetTableConfig, item_key: str, rating_column: str, *, chunk_size: int = 10000000.0) -> int:
167    def process_event_table(
168            self,
169            output_dir: str,
170            event_table: DatasetTableConfig,
171            item_key: str,
172            rating_column: str,
173            *,
174            chunk_size: int=DEFAULT_MATRIX_CHUNK_SIZE) -> int:
175        """Process the event table in chunks.
176
177        Args:
178            output_dir: the output directory to store the chunks.
179            event_table: the event table to process into chunks.
180            item_key: the item key name to create a user-item chunk for.
181            rating_column: the name of the rating column to use in the user-item chunk.
182            chunk_size: the size of the chunks to use during processing.
183
184        Returns:
185            the number of chunks that are generated.
186        """
187        if self.verbose:
188            print('Started processing event table')
189
190        num_chunks = 0
191        start_row = 0
192        event_table_it = event_table.read_table(self.dataset.data_dir, chunk_size=chunk_size)
193        for i, chunk in enumerate(event_table_it):
194            end_row = int(min(start_row + chunk_size, event_table.num_records))
195            percent = float(start_row) / float(event_table.num_records) * 100.0
196            if self.verbose:
197                print('Processing rows', start_row, 'to', end_row,
198                      'of', event_table.num_records, '=>', str(percent) + '%')
199
200            chunk = create_matrix_chunk(chunk, item_key, rating_column)
201            # TODO filter chunk based on user/item columns
202            chunk.to_csv(os.path.join(output_dir, 'chunk_' + str(i) + '.tsv'),
203                         sep='\t', header=True, index=False)
204
205            num_chunks += 1
206            start_row += chunk_size
207
208        if self.verbose:
209            print('Finished processing event table')
210
211        return num_chunks

Process the event table in chunks.

Args: output_dir: the output directory to store the chunks. event_table: the event table to process into chunks. item_key: the item key name to create a user-item chunk for. rating_column: the name of the rating column to use in the user-item chunk. chunk_size: the size of the chunks to use during processing.

Returns: the number of chunks that are generated.

def process_matrix_from_chunks( self, output_dir: str, num_chunks: int, item_key: str, rating_column: str) -> Tuple[pandas.core.frame.DataFrame, List[int], List[int]]:
213    def process_matrix_from_chunks(
214            self,
215            output_dir: str,
216            num_chunks: int,
217            item_key: str,
218            rating_column: str) -> Tuple[pd.DataFrame, List[int], List[int]]:
219        """Process the user-item matrix from the stored chunks.
220
221        Args:
222            output_dir: the output directory where the chunks are stored.
223            num_chunks: the number of chunks in the output directory.
224            item_key: the item key name that was used to create user-item chunks.
225            rating_column: the name of the rating column that was used to create user-item chunks.
226
227        Returns:
228            the user-item matrix and the lists of unique user/item ids.
229        """
230        if self.verbose:
231            print('Started processing matrix chunks')
232
233        matrix = pd.DataFrame({
234            'user_id': pd.Series(dtype='int'),
235            item_key: pd.Series(dtype='int'),
236            rating_column: pd.Series(dtype='float')
237        })
238
239        for i in range(num_chunks):
240            if self.verbose:
241                print('Processing matrix chunk', i + 1, 'of', num_chunks)
242
243            chunk = pd.read_table(os.path.join(output_dir, 'chunk_' + str(i) + '.tsv'))
244            matrix = pd.concat([matrix, chunk], ignore_index=True)
245            matrix = matrix.groupby(['user_id', item_key], as_index=False).sum()
246
247        unique_users = matrix['user_id'].unique()
248        # users from 0...num_users
249        matrix = pd.merge(
250            matrix,
251            pd.DataFrame(list(enumerate(unique_users)), columns=['user', 'user_id']),
252            how='left', on='user_id'
253        )
254        matrix.drop('user_id', axis=1, inplace=True)
255
256        unique_items = matrix[item_key].unique()
257        # items from 0...num_items
258        matrix = pd.merge(
259            matrix,
260            pd.DataFrame(list(enumerate(unique_items)), columns=['item', item_key]),
261            how='left', on=item_key
262        )
263        matrix.drop(item_key, axis=1, inplace=True)
264
265        if self.verbose:
266            print('Finished processing matrix chunks')
267
268        return matrix[['user', 'item', rating_column]], list(unique_users), list(unique_items)

Process the user-item matrix from the stored chunks.

Args: output_dir: the output directory where the chunks are stored. num_chunks: the number of chunks in the output directory. item_key: the item key name that was used to create user-item chunks. rating_column: the name of the rating column that was used to create user-item chunks.

Returns: the user-item matrix and the lists of unique user/item ids.

def save_matrix( self, matrix_name: str, matrix_tuple: Tuple[pandas.core.frame.DataFrame, List[int], List[int]], item_key: str, rating_column: str) -> src.fairreckitlib.data.set.dataset_config.DatasetMatrixConfig:
270    def save_matrix(
271            self,
272            matrix_name: str,
273            matrix_tuple: Tuple[pd.DataFrame, List[int], List[int]],
274            item_key: str,
275            rating_column: str) -> DatasetMatrixConfig:
276        """Save the matrix in the dataset directory.
277
278        Args:
279            matrix_name: the name that is used to save the matrix, user and item lists.
280            matrix_tuple: the user-item matrix and the lists of unique user/item ids.
281            item_key: the item key name that was used to create the user-item matrix.
282            rating_column: name of the rating column that was used to create the user-item matrix.
283
284        Returns:
285            the dataset matrix configuration.
286        """
287        if self.verbose:
288            print('Started saving matrix')
289
290        dataset_matrix_name = TABLE_FILE_PREFIX + self.dataset.get_name() + '_' + matrix_name
291        matrix, users, items = matrix_tuple
292
293        # create the user indices config and save the array
294        user_index_config = DatasetIndexConfig(
295            dataset_matrix_name + '_user_indices.hdf5',
296            'user_id',
297            len(users)
298        )
299        user_index_config.save_indices(self.dataset.data_dir, users)
300
301        # create the item indices config and save the array
302        item_index_config = DatasetIndexConfig(
303            dataset_matrix_name + '_item_indices.hdf5',
304            item_key,
305            len(items)
306        )
307        item_index_config.save_indices(self.dataset.data_dir, items)
308
309        # create the sample matrix table config and save the table
310        matrix_table_config = create_dataset_table_config(
311            dataset_matrix_name + '_matrix.tsv.bz2',
312            ['user_id', item_key],
313            ['matrix_' + rating_column],
314            compression='bz2',
315            foreign_keys=['user_id', item_key],
316            num_records=len(matrix)
317        )
318        matrix_table_config.save_table(matrix, self.dataset.data_dir)
319
320        rating_min = float(matrix[rating_column].min())
321        rating_max = float(matrix[rating_column].max())
322        rating_type = DATASET_RATINGS_IMPLICIT \
323            if rating_max > RATING_TYPE_THRESHOLD else DATASET_RATINGS_EXPLICIT
324
325        if self.verbose:
326            print('Finished saving matrix')
327
328        return DatasetMatrixConfig(
329            matrix_table_config,
330            RatingMatrixConfig(
331                rating_min,
332                rating_max,
333                rating_type
334            ),
335            user_index_config,
336            item_index_config
337        )

Save the matrix in the dataset directory.

Args: matrix_name: the name that is used to save the matrix, user and item lists. matrix_tuple: the user-item matrix and the lists of unique user/item ids. item_key: the item key name that was used to create the user-item matrix. rating_column: name of the rating column that was used to create the user-item matrix.

Returns: the dataset matrix configuration.

def create_matrix_chunk( chunk: pandas.core.frame.DataFrame, item_key: str, rating_column: str) -> pandas.core.frame.DataFrame:
341def create_matrix_chunk(chunk: pd.DataFrame, item_key: str, rating_column: str) -> pd.DataFrame:
342    """Create a user-item matrix chunk by counting occurrences of user-item combinations.
343
344    Args:
345        chunk: a dataframe chunk containing the user-item events.
346        item_key: the key of the item to use for counting occurrences.
347        rating_column: the name of the rating column to store the user-item counter in.
348
349    Returns:
350        a dataframe with the columns ['user', item_key, rating_column].
351    """
352    # make a copy to prevent pandas slicing errors and drop any irrelevant columns
353    chunk_header = ['user_id', item_key]
354    chunk = pd.DataFrame(chunk[chunk_header])
355
356    chunk[rating_column] = np.ones(len(chunk))
357    chunk = chunk.groupby(chunk_header, as_index=False).count()
358    return chunk

Create a user-item matrix chunk by counting occurrences of user-item combinations.

Args: chunk: a dataframe chunk containing the user-item events. item_key: the key of the item to use for counting occurrences. rating_column: the name of the rating column to store the user-item counter in.

Returns: a dataframe with the columns ['user', item_key, rating_column].