src.fairreckitlib.data.set.processor.dataset_processor_lfm
This module contains the base processor for LastFM datasets.
Classes:
DatasetProcessorLFM: the base class for LastFM dataset processors.
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)
1"""This module contains the base processor for LastFM datasets. 2 3Classes: 4 5 DatasetProcessorLFM: the base class for LastFM dataset processors. 6 7This program has been developed by students from the bachelor Computer Science at 8Utrecht University within the Software Project course. 9© Copyright Utrecht University (Department of Information and Computing Sciences) 10""" 11 12from abc import ABCMeta, abstractmethod 13from typing import Callable, List, Optional, Tuple 14 15import pandas as pd 16 17from ..dataset_config import DATASET_RATINGS_IMPLICIT, RatingMatrixConfig 18from ..dataset_config import DatasetIndexConfig, DatasetMatrixConfig, DatasetTableConfig 19from ..dataset_constants import TABLE_FILE_PREFIX 20from .dataset_processor_base import DatasetProcessorBase 21 22 23class DatasetProcessorLFM(DatasetProcessorBase, metaclass=ABCMeta): 24 """DataProcessor base class for LastFM datasets. 25 26 Provides an abstraction for processing the listening event table, 27 and also for generalizing the user table data. An iterative matrix 28 processor function is exposed for derived subclasses as the LastFM 29 dataset matrices tend to be very big. 30 31 Abstract methods: 32 33 create_listening_events_config 34 create_user_table_config 35 """ 36 37 @abstractmethod 38 def create_listening_events_config(self) -> Optional[DatasetTableConfig]: 39 """Create the listening event table configuration. 40 41 Returns: 42 the configuration of the listening event table or None when not available. 43 """ 44 raise NotImplementedError() 45 46 @abstractmethod 47 def create_user_table_config(self) -> DatasetTableConfig: 48 """Create the user table configuration. 49 50 Returns: 51 the configuration of the user table. 52 """ 53 raise NotImplementedError() 54 55 def get_event_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]: 56 """Get event table configuration processors. 57 58 Returns: 59 a list containing the listening event table processor. 60 """ 61 return [('listening event', self.process_listening_events)] 62 63 def get_table_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]: 64 """Get table configuration processors. 65 66 Derived implementations are expected to call the super implementation in 67 order to include the user table in the configuration. 68 69 Returns: 70 a list containing the user table processor. 71 """ 72 return [('user', self.process_user_table)] 73 74 def process_listening_events(self) -> Optional[DatasetTableConfig]: 75 """Process the listening event table. 76 77 Returns: 78 the listening event table configuration or None on failure. 79 """ 80 les_table_config = self.create_listening_events_config() 81 # skip without table configuration 82 if les_table_config is None: 83 return None 84 85 try: 86 # count records in chunks as these files are huge 87 table_iterator = les_table_config.read_table(self.dataset_dir, chunk_size=50000000) 88 for _, table in enumerate(table_iterator): 89 les_table_config.num_records += len(table) 90 91 return les_table_config 92 except FileNotFoundError: 93 return None 94 95 def process_matrix( 96 self, 97 matrix_table_config: DatasetTableConfig, 98 user_idx_file: str=None, 99 item_idx_file: str=None) -> Optional[DatasetMatrixConfig]: 100 """Process the matrix with the specified configuration. 101 102 Args: 103 matrix_table_config: the configuration of the matrix to process. 104 user_idx_file: the file name of the user indices or None when not present. 105 item_idx_file: the file name of the item indices or None when not present. 106 107 Returns: 108 the matrix configuration or None on failure. 109 """ 110 user_id = matrix_table_config.primary_key[0] 111 item_id = matrix_table_config.primary_key[1] 112 count_column = matrix_table_config.columns[0] 113 114 unique_users = [] 115 unique_items = [] 116 rating_min = 1000000000.0 117 rating_max = 0.0 118 119 try: 120 matrix_it = matrix_table_config.read_table(self.dataset_dir, chunk_size=50000000) 121 # process matrix in chunks 122 for _, matrix in enumerate(matrix_it): 123 unique_users = pd.Series(unique_users, dtype='int').append(matrix[user_id]).unique() 124 unique_items = pd.Series(unique_items, dtype='int').append(matrix[item_id]).unique() 125 matrix_table_config.num_records += len(matrix) 126 rating_min = min(rating_min, matrix[count_column].min()) 127 rating_max = max(rating_max, matrix[count_column].max()) 128 except FileNotFoundError: 129 return None 130 131 return DatasetMatrixConfig( 132 matrix_table_config, 133 RatingMatrixConfig( 134 float(rating_min), 135 float(rating_max), 136 DATASET_RATINGS_IMPLICIT 137 ), 138 DatasetIndexConfig( 139 user_idx_file, 140 user_id, 141 len(unique_users) 142 ), 143 DatasetIndexConfig( 144 item_idx_file, 145 item_id, 146 len(unique_items) 147 ) 148 ) 149 150 def process_user_table(self) -> Optional[DatasetTableConfig]: 151 """Process the user table. 152 153 Changes the contents of the gender column to be more user-friendly, 154 and the contents of the age column to -1 when above 100. 155 156 Returns: 157 the user table configuration or None on failure. 158 """ 159 user_table_config = self.create_user_table_config() 160 161 try: 162 user_table = user_table_config.read_table(self.dataset_dir) 163 user_table_config.num_records = len(user_table) 164 except FileNotFoundError: 165 return None 166 167 # convert gender to more user-friendly names 168 user_table['user_gender'].replace({ 169 'm': 'Male', 170 'f': 'Female', 171 'n': 'Neutral' 172 }, inplace=True) 173 # convert age above 100 to -1 174 user_table['user_age'].mask(user_table['user_age'].gt(100), inplace=True) 175 user_table['user_age'].fillna(-1.0, inplace=True) 176 user_table['user_age'] = user_table['user_age'].astype(int) 177 178 # update table configuration 179 user_table_config.file.name = TABLE_FILE_PREFIX + self.dataset_name + '_users.tsv.bz2' 180 user_table_config.file.options.compression = 'bz2' 181 user_table_config.file.options.header = False 182 user_table_config.file.options.sep = None 183 184 # store the resulting user table 185 user_table_config.save_table(user_table, self.dataset_dir) 186 187 return user_table_config
24class DatasetProcessorLFM(DatasetProcessorBase, metaclass=ABCMeta): 25 """DataProcessor base class for LastFM datasets. 26 27 Provides an abstraction for processing the listening event table, 28 and also for generalizing the user table data. An iterative matrix 29 processor function is exposed for derived subclasses as the LastFM 30 dataset matrices tend to be very big. 31 32 Abstract methods: 33 34 create_listening_events_config 35 create_user_table_config 36 """ 37 38 @abstractmethod 39 def create_listening_events_config(self) -> Optional[DatasetTableConfig]: 40 """Create the listening event table configuration. 41 42 Returns: 43 the configuration of the listening event table or None when not available. 44 """ 45 raise NotImplementedError() 46 47 @abstractmethod 48 def create_user_table_config(self) -> DatasetTableConfig: 49 """Create the user table configuration. 50 51 Returns: 52 the configuration of the user table. 53 """ 54 raise NotImplementedError() 55 56 def get_event_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]: 57 """Get event table configuration processors. 58 59 Returns: 60 a list containing the listening event table processor. 61 """ 62 return [('listening event', self.process_listening_events)] 63 64 def get_table_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]: 65 """Get table configuration processors. 66 67 Derived implementations are expected to call the super implementation in 68 order to include the user table in the configuration. 69 70 Returns: 71 a list containing the user table processor. 72 """ 73 return [('user', self.process_user_table)] 74 75 def process_listening_events(self) -> Optional[DatasetTableConfig]: 76 """Process the listening event table. 77 78 Returns: 79 the listening event table configuration or None on failure. 80 """ 81 les_table_config = self.create_listening_events_config() 82 # skip without table configuration 83 if les_table_config is None: 84 return None 85 86 try: 87 # count records in chunks as these files are huge 88 table_iterator = les_table_config.read_table(self.dataset_dir, chunk_size=50000000) 89 for _, table in enumerate(table_iterator): 90 les_table_config.num_records += len(table) 91 92 return les_table_config 93 except FileNotFoundError: 94 return None 95 96 def process_matrix( 97 self, 98 matrix_table_config: DatasetTableConfig, 99 user_idx_file: str=None, 100 item_idx_file: str=None) -> Optional[DatasetMatrixConfig]: 101 """Process the matrix with the specified configuration. 102 103 Args: 104 matrix_table_config: the configuration of the matrix to process. 105 user_idx_file: the file name of the user indices or None when not present. 106 item_idx_file: the file name of the item indices or None when not present. 107 108 Returns: 109 the matrix configuration or None on failure. 110 """ 111 user_id = matrix_table_config.primary_key[0] 112 item_id = matrix_table_config.primary_key[1] 113 count_column = matrix_table_config.columns[0] 114 115 unique_users = [] 116 unique_items = [] 117 rating_min = 1000000000.0 118 rating_max = 0.0 119 120 try: 121 matrix_it = matrix_table_config.read_table(self.dataset_dir, chunk_size=50000000) 122 # process matrix in chunks 123 for _, matrix in enumerate(matrix_it): 124 unique_users = pd.Series(unique_users, dtype='int').append(matrix[user_id]).unique() 125 unique_items = pd.Series(unique_items, dtype='int').append(matrix[item_id]).unique() 126 matrix_table_config.num_records += len(matrix) 127 rating_min = min(rating_min, matrix[count_column].min()) 128 rating_max = max(rating_max, matrix[count_column].max()) 129 except FileNotFoundError: 130 return None 131 132 return DatasetMatrixConfig( 133 matrix_table_config, 134 RatingMatrixConfig( 135 float(rating_min), 136 float(rating_max), 137 DATASET_RATINGS_IMPLICIT 138 ), 139 DatasetIndexConfig( 140 user_idx_file, 141 user_id, 142 len(unique_users) 143 ), 144 DatasetIndexConfig( 145 item_idx_file, 146 item_id, 147 len(unique_items) 148 ) 149 ) 150 151 def process_user_table(self) -> Optional[DatasetTableConfig]: 152 """Process the user table. 153 154 Changes the contents of the gender column to be more user-friendly, 155 and the contents of the age column to -1 when above 100. 156 157 Returns: 158 the user table configuration or None on failure. 159 """ 160 user_table_config = self.create_user_table_config() 161 162 try: 163 user_table = user_table_config.read_table(self.dataset_dir) 164 user_table_config.num_records = len(user_table) 165 except FileNotFoundError: 166 return None 167 168 # convert gender to more user-friendly names 169 user_table['user_gender'].replace({ 170 'm': 'Male', 171 'f': 'Female', 172 'n': 'Neutral' 173 }, inplace=True) 174 # convert age above 100 to -1 175 user_table['user_age'].mask(user_table['user_age'].gt(100), inplace=True) 176 user_table['user_age'].fillna(-1.0, inplace=True) 177 user_table['user_age'] = user_table['user_age'].astype(int) 178 179 # update table configuration 180 user_table_config.file.name = TABLE_FILE_PREFIX + self.dataset_name + '_users.tsv.bz2' 181 user_table_config.file.options.compression = 'bz2' 182 user_table_config.file.options.header = False 183 user_table_config.file.options.sep = None 184 185 # store the resulting user table 186 user_table_config.save_table(user_table, self.dataset_dir) 187 188 return user_table_config
DataProcessor base class for LastFM datasets.
Provides an abstraction for processing the listening event table, and also for generalizing the user table data. An iterative matrix processor function is exposed for derived subclasses as the LastFM dataset matrices tend to be very big.
Abstract methods:
create_listening_events_config create_user_table_config
38 @abstractmethod 39 def create_listening_events_config(self) -> Optional[DatasetTableConfig]: 40 """Create the listening event table configuration. 41 42 Returns: 43 the configuration of the listening event table or None when not available. 44 """ 45 raise NotImplementedError()
Create the listening event table configuration.
Returns: the configuration of the listening event table or None when not available.
47 @abstractmethod 48 def create_user_table_config(self) -> DatasetTableConfig: 49 """Create the user table configuration. 50 51 Returns: 52 the configuration of the user table. 53 """ 54 raise NotImplementedError()
Create the user table configuration.
Returns: the configuration of the user table.
56 def get_event_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]: 57 """Get event table configuration processors. 58 59 Returns: 60 a list containing the listening event table processor. 61 """ 62 return [('listening event', self.process_listening_events)]
Get event table configuration processors.
Returns: a list containing the listening event table processor.
64 def get_table_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]: 65 """Get table configuration processors. 66 67 Derived implementations are expected to call the super implementation in 68 order to include the user table in the configuration. 69 70 Returns: 71 a list containing the user table processor. 72 """ 73 return [('user', self.process_user_table)]
Get table configuration processors.
Derived implementations are expected to call the super implementation in order to include the user table in the configuration.
Returns: a list containing the user table processor.
75 def process_listening_events(self) -> Optional[DatasetTableConfig]: 76 """Process the listening event table. 77 78 Returns: 79 the listening event table configuration or None on failure. 80 """ 81 les_table_config = self.create_listening_events_config() 82 # skip without table configuration 83 if les_table_config is None: 84 return None 85 86 try: 87 # count records in chunks as these files are huge 88 table_iterator = les_table_config.read_table(self.dataset_dir, chunk_size=50000000) 89 for _, table in enumerate(table_iterator): 90 les_table_config.num_records += len(table) 91 92 return les_table_config 93 except FileNotFoundError: 94 return None
Process the listening event table.
Returns: the listening event table configuration or None on failure.
96 def process_matrix( 97 self, 98 matrix_table_config: DatasetTableConfig, 99 user_idx_file: str=None, 100 item_idx_file: str=None) -> Optional[DatasetMatrixConfig]: 101 """Process the matrix with the specified configuration. 102 103 Args: 104 matrix_table_config: the configuration of the matrix to process. 105 user_idx_file: the file name of the user indices or None when not present. 106 item_idx_file: the file name of the item indices or None when not present. 107 108 Returns: 109 the matrix configuration or None on failure. 110 """ 111 user_id = matrix_table_config.primary_key[0] 112 item_id = matrix_table_config.primary_key[1] 113 count_column = matrix_table_config.columns[0] 114 115 unique_users = [] 116 unique_items = [] 117 rating_min = 1000000000.0 118 rating_max = 0.0 119 120 try: 121 matrix_it = matrix_table_config.read_table(self.dataset_dir, chunk_size=50000000) 122 # process matrix in chunks 123 for _, matrix in enumerate(matrix_it): 124 unique_users = pd.Series(unique_users, dtype='int').append(matrix[user_id]).unique() 125 unique_items = pd.Series(unique_items, dtype='int').append(matrix[item_id]).unique() 126 matrix_table_config.num_records += len(matrix) 127 rating_min = min(rating_min, matrix[count_column].min()) 128 rating_max = max(rating_max, matrix[count_column].max()) 129 except FileNotFoundError: 130 return None 131 132 return DatasetMatrixConfig( 133 matrix_table_config, 134 RatingMatrixConfig( 135 float(rating_min), 136 float(rating_max), 137 DATASET_RATINGS_IMPLICIT 138 ), 139 DatasetIndexConfig( 140 user_idx_file, 141 user_id, 142 len(unique_users) 143 ), 144 DatasetIndexConfig( 145 item_idx_file, 146 item_id, 147 len(unique_items) 148 ) 149 )
Process the matrix with the specified configuration.
Args: matrix_table_config: the configuration of the matrix to process. user_idx_file: the file name of the user indices or None when not present. item_idx_file: the file name of the item indices or None when not present.
Returns: the matrix configuration or None on failure.
151 def process_user_table(self) -> Optional[DatasetTableConfig]: 152 """Process the user table. 153 154 Changes the contents of the gender column to be more user-friendly, 155 and the contents of the age column to -1 when above 100. 156 157 Returns: 158 the user table configuration or None on failure. 159 """ 160 user_table_config = self.create_user_table_config() 161 162 try: 163 user_table = user_table_config.read_table(self.dataset_dir) 164 user_table_config.num_records = len(user_table) 165 except FileNotFoundError: 166 return None 167 168 # convert gender to more user-friendly names 169 user_table['user_gender'].replace({ 170 'm': 'Male', 171 'f': 'Female', 172 'n': 'Neutral' 173 }, inplace=True) 174 # convert age above 100 to -1 175 user_table['user_age'].mask(user_table['user_age'].gt(100), inplace=True) 176 user_table['user_age'].fillna(-1.0, inplace=True) 177 user_table['user_age'] = user_table['user_age'].astype(int) 178 179 # update table configuration 180 user_table_config.file.name = TABLE_FILE_PREFIX + self.dataset_name + '_users.tsv.bz2' 181 user_table_config.file.options.compression = 'bz2' 182 user_table_config.file.options.header = False 183 user_table_config.file.options.sep = None 184 185 # store the resulting user table 186 user_table_config.save_table(user_table, self.dataset_dir) 187 188 return user_table_config
Process the user table.
Changes the contents of the gender column to be more user-friendly, and the contents of the age column to -1 when above 100.
Returns: the user table configuration or None on failure.