src.fairreckitlib.data.set.processor.dataset_processor_lfm

This module contains the base processor for LastFM datasets.

Classes:

DatasetProcessorLFM: the base class for LastFM dataset processors.

This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)

  1"""This module contains the base processor for LastFM datasets.
  2
  3Classes:
  4
  5    DatasetProcessorLFM: the base class for LastFM dataset processors.
  6
  7This program has been developed by students from the bachelor Computer Science at
  8Utrecht University within the Software Project course.
  9© Copyright Utrecht University (Department of Information and Computing Sciences)
 10"""
 11
 12from abc import ABCMeta, abstractmethod
 13from typing import Callable, List, Optional, Tuple
 14
 15import pandas as pd
 16
 17from ..dataset_config import DATASET_RATINGS_IMPLICIT, RatingMatrixConfig
 18from ..dataset_config import DatasetIndexConfig, DatasetMatrixConfig, DatasetTableConfig
 19from ..dataset_constants import TABLE_FILE_PREFIX
 20from .dataset_processor_base import DatasetProcessorBase
 21
 22
 23class DatasetProcessorLFM(DatasetProcessorBase, metaclass=ABCMeta):
 24    """DataProcessor base class for LastFM datasets.
 25
 26    Provides an abstraction for processing the listening event table,
 27    and also for generalizing the user table data. An iterative matrix
 28    processor function is exposed for derived subclasses as the LastFM
 29    dataset matrices tend to be very big.
 30
 31    Abstract methods:
 32
 33    create_listening_events_config
 34    create_user_table_config
 35    """
 36
 37    @abstractmethod
 38    def create_listening_events_config(self) -> Optional[DatasetTableConfig]:
 39        """Create the listening event table configuration.
 40
 41        Returns:
 42            the configuration of the listening event table or None when not available.
 43        """
 44        raise NotImplementedError()
 45
 46    @abstractmethod
 47    def create_user_table_config(self) -> DatasetTableConfig:
 48        """Create the user table configuration.
 49
 50        Returns:
 51            the configuration of the user table.
 52        """
 53        raise NotImplementedError()
 54
 55    def get_event_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]:
 56        """Get event table configuration processors.
 57
 58        Returns:
 59            a list containing the listening event table processor.
 60        """
 61        return [('listening event', self.process_listening_events)]
 62
 63    def get_table_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]:
 64        """Get table configuration processors.
 65
 66        Derived implementations are expected to call the super implementation in
 67        order to include the user table in the configuration.
 68
 69        Returns:
 70            a list containing the user table processor.
 71        """
 72        return [('user', self.process_user_table)]
 73
 74    def process_listening_events(self) -> Optional[DatasetTableConfig]:
 75        """Process the listening event table.
 76
 77        Returns:
 78            the listening event table configuration or None on failure.
 79        """
 80        les_table_config = self.create_listening_events_config()
 81        # skip without table configuration
 82        if les_table_config is None:
 83            return None
 84
 85        try:
 86            # count records in chunks as these files are huge
 87            table_iterator = les_table_config.read_table(self.dataset_dir, chunk_size=50000000)
 88            for _, table in enumerate(table_iterator):
 89                les_table_config.num_records += len(table)
 90
 91            return les_table_config
 92        except FileNotFoundError:
 93            return None
 94
 95    def process_matrix(
 96            self,
 97            matrix_table_config: DatasetTableConfig,
 98            user_idx_file: str=None,
 99            item_idx_file: str=None) -> Optional[DatasetMatrixConfig]:
100        """Process the matrix with the specified configuration.
101
102        Args:
103            matrix_table_config: the configuration of the matrix to process.
104            user_idx_file: the file name of the user indices or None when not present.
105            item_idx_file: the file name of the item indices or None when not present.
106
107        Returns:
108            the matrix configuration or None on failure.
109        """
110        user_id = matrix_table_config.primary_key[0]
111        item_id = matrix_table_config.primary_key[1]
112        count_column = matrix_table_config.columns[0]
113
114        unique_users = []
115        unique_items = []
116        rating_min = 1000000000.0
117        rating_max = 0.0
118
119        try:
120            matrix_it = matrix_table_config.read_table(self.dataset_dir, chunk_size=50000000)
121            # process matrix in chunks
122            for _, matrix in enumerate(matrix_it):
123                unique_users = pd.Series(unique_users, dtype='int').append(matrix[user_id]).unique()
124                unique_items = pd.Series(unique_items, dtype='int').append(matrix[item_id]).unique()
125                matrix_table_config.num_records += len(matrix)
126                rating_min = min(rating_min, matrix[count_column].min())
127                rating_max = max(rating_max, matrix[count_column].max())
128        except FileNotFoundError:
129            return None
130
131        return DatasetMatrixConfig(
132            matrix_table_config,
133            RatingMatrixConfig(
134                float(rating_min),
135                float(rating_max),
136                DATASET_RATINGS_IMPLICIT
137            ),
138            DatasetIndexConfig(
139                user_idx_file,
140                user_id,
141                len(unique_users)
142            ),
143            DatasetIndexConfig(
144                item_idx_file,
145                item_id,
146                len(unique_items)
147            )
148        )
149
150    def process_user_table(self) -> Optional[DatasetTableConfig]:
151        """Process the user table.
152
153        Changes the contents of the gender column to be more user-friendly,
154        and the contents of the age column to -1 when above 100.
155
156        Returns:
157            the user table configuration or None on failure.
158        """
159        user_table_config = self.create_user_table_config()
160
161        try:
162            user_table = user_table_config.read_table(self.dataset_dir)
163            user_table_config.num_records = len(user_table)
164        except FileNotFoundError:
165            return None
166
167        # convert gender to more user-friendly names
168        user_table['user_gender'].replace({
169            'm': 'Male',
170            'f': 'Female',
171            'n': 'Neutral'
172        }, inplace=True)
173        # convert age above 100 to -1
174        user_table['user_age'].mask(user_table['user_age'].gt(100), inplace=True)
175        user_table['user_age'].fillna(-1.0, inplace=True)
176        user_table['user_age'] = user_table['user_age'].astype(int)
177
178        # update table configuration
179        user_table_config.file.name = TABLE_FILE_PREFIX + self.dataset_name + '_users.tsv.bz2'
180        user_table_config.file.options.compression = 'bz2'
181        user_table_config.file.options.header = False
182        user_table_config.file.options.sep = None
183
184        # store the resulting user table
185        user_table_config.save_table(user_table, self.dataset_dir)
186
187        return user_table_config
 24class DatasetProcessorLFM(DatasetProcessorBase, metaclass=ABCMeta):
 25    """DataProcessor base class for LastFM datasets.
 26
 27    Provides an abstraction for processing the listening event table,
 28    and also for generalizing the user table data. An iterative matrix
 29    processor function is exposed for derived subclasses as the LastFM
 30    dataset matrices tend to be very big.
 31
 32    Abstract methods:
 33
 34    create_listening_events_config
 35    create_user_table_config
 36    """
 37
 38    @abstractmethod
 39    def create_listening_events_config(self) -> Optional[DatasetTableConfig]:
 40        """Create the listening event table configuration.
 41
 42        Returns:
 43            the configuration of the listening event table or None when not available.
 44        """
 45        raise NotImplementedError()
 46
 47    @abstractmethod
 48    def create_user_table_config(self) -> DatasetTableConfig:
 49        """Create the user table configuration.
 50
 51        Returns:
 52            the configuration of the user table.
 53        """
 54        raise NotImplementedError()
 55
 56    def get_event_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]:
 57        """Get event table configuration processors.
 58
 59        Returns:
 60            a list containing the listening event table processor.
 61        """
 62        return [('listening event', self.process_listening_events)]
 63
 64    def get_table_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]:
 65        """Get table configuration processors.
 66
 67        Derived implementations are expected to call the super implementation in
 68        order to include the user table in the configuration.
 69
 70        Returns:
 71            a list containing the user table processor.
 72        """
 73        return [('user', self.process_user_table)]
 74
 75    def process_listening_events(self) -> Optional[DatasetTableConfig]:
 76        """Process the listening event table.
 77
 78        Returns:
 79            the listening event table configuration or None on failure.
 80        """
 81        les_table_config = self.create_listening_events_config()
 82        # skip without table configuration
 83        if les_table_config is None:
 84            return None
 85
 86        try:
 87            # count records in chunks as these files are huge
 88            table_iterator = les_table_config.read_table(self.dataset_dir, chunk_size=50000000)
 89            for _, table in enumerate(table_iterator):
 90                les_table_config.num_records += len(table)
 91
 92            return les_table_config
 93        except FileNotFoundError:
 94            return None
 95
 96    def process_matrix(
 97            self,
 98            matrix_table_config: DatasetTableConfig,
 99            user_idx_file: str=None,
100            item_idx_file: str=None) -> Optional[DatasetMatrixConfig]:
101        """Process the matrix with the specified configuration.
102
103        Args:
104            matrix_table_config: the configuration of the matrix to process.
105            user_idx_file: the file name of the user indices or None when not present.
106            item_idx_file: the file name of the item indices or None when not present.
107
108        Returns:
109            the matrix configuration or None on failure.
110        """
111        user_id = matrix_table_config.primary_key[0]
112        item_id = matrix_table_config.primary_key[1]
113        count_column = matrix_table_config.columns[0]
114
115        unique_users = []
116        unique_items = []
117        rating_min = 1000000000.0
118        rating_max = 0.0
119
120        try:
121            matrix_it = matrix_table_config.read_table(self.dataset_dir, chunk_size=50000000)
122            # process matrix in chunks
123            for _, matrix in enumerate(matrix_it):
124                unique_users = pd.Series(unique_users, dtype='int').append(matrix[user_id]).unique()
125                unique_items = pd.Series(unique_items, dtype='int').append(matrix[item_id]).unique()
126                matrix_table_config.num_records += len(matrix)
127                rating_min = min(rating_min, matrix[count_column].min())
128                rating_max = max(rating_max, matrix[count_column].max())
129        except FileNotFoundError:
130            return None
131
132        return DatasetMatrixConfig(
133            matrix_table_config,
134            RatingMatrixConfig(
135                float(rating_min),
136                float(rating_max),
137                DATASET_RATINGS_IMPLICIT
138            ),
139            DatasetIndexConfig(
140                user_idx_file,
141                user_id,
142                len(unique_users)
143            ),
144            DatasetIndexConfig(
145                item_idx_file,
146                item_id,
147                len(unique_items)
148            )
149        )
150
151    def process_user_table(self) -> Optional[DatasetTableConfig]:
152        """Process the user table.
153
154        Changes the contents of the gender column to be more user-friendly,
155        and the contents of the age column to -1 when above 100.
156
157        Returns:
158            the user table configuration or None on failure.
159        """
160        user_table_config = self.create_user_table_config()
161
162        try:
163            user_table = user_table_config.read_table(self.dataset_dir)
164            user_table_config.num_records = len(user_table)
165        except FileNotFoundError:
166            return None
167
168        # convert gender to more user-friendly names
169        user_table['user_gender'].replace({
170            'm': 'Male',
171            'f': 'Female',
172            'n': 'Neutral'
173        }, inplace=True)
174        # convert age above 100 to -1
175        user_table['user_age'].mask(user_table['user_age'].gt(100), inplace=True)
176        user_table['user_age'].fillna(-1.0, inplace=True)
177        user_table['user_age'] = user_table['user_age'].astype(int)
178
179        # update table configuration
180        user_table_config.file.name = TABLE_FILE_PREFIX + self.dataset_name + '_users.tsv.bz2'
181        user_table_config.file.options.compression = 'bz2'
182        user_table_config.file.options.header = False
183        user_table_config.file.options.sep = None
184
185        # store the resulting user table
186        user_table_config.save_table(user_table, self.dataset_dir)
187
188        return user_table_config

DataProcessor base class for LastFM datasets.

Provides an abstraction for processing the listening event table, and also for generalizing the user table data. An iterative matrix processor function is exposed for derived subclasses as the LastFM dataset matrices tend to be very big.

Abstract methods:

create_listening_events_config create_user_table_config

@abstractmethod
def create_listening_events_config( self) -> Optional[src.fairreckitlib.data.set.dataset_config.DatasetTableConfig]:
38    @abstractmethod
39    def create_listening_events_config(self) -> Optional[DatasetTableConfig]:
40        """Create the listening event table configuration.
41
42        Returns:
43            the configuration of the listening event table or None when not available.
44        """
45        raise NotImplementedError()

Create the listening event table configuration.

Returns: the configuration of the listening event table or None when not available.

@abstractmethod
def create_user_table_config(self) -> src.fairreckitlib.data.set.dataset_config.DatasetTableConfig:
47    @abstractmethod
48    def create_user_table_config(self) -> DatasetTableConfig:
49        """Create the user table configuration.
50
51        Returns:
52            the configuration of the user table.
53        """
54        raise NotImplementedError()

Create the user table configuration.

Returns: the configuration of the user table.

def get_event_configs( self) -> List[Tuple[str, Callable[[], Optional[src.fairreckitlib.data.set.dataset_config.DatasetTableConfig]]]]:
56    def get_event_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]:
57        """Get event table configuration processors.
58
59        Returns:
60            a list containing the listening event table processor.
61        """
62        return [('listening event', self.process_listening_events)]

Get event table configuration processors.

Returns: a list containing the listening event table processor.

def get_table_configs( self) -> List[Tuple[str, Callable[[], Optional[src.fairreckitlib.data.set.dataset_config.DatasetTableConfig]]]]:
64    def get_table_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]:
65        """Get table configuration processors.
66
67        Derived implementations are expected to call the super implementation in
68        order to include the user table in the configuration.
69
70        Returns:
71            a list containing the user table processor.
72        """
73        return [('user', self.process_user_table)]

Get table configuration processors.

Derived implementations are expected to call the super implementation in order to include the user table in the configuration.

Returns: a list containing the user table processor.

def process_listening_events( self) -> Optional[src.fairreckitlib.data.set.dataset_config.DatasetTableConfig]:
75    def process_listening_events(self) -> Optional[DatasetTableConfig]:
76        """Process the listening event table.
77
78        Returns:
79            the listening event table configuration or None on failure.
80        """
81        les_table_config = self.create_listening_events_config()
82        # skip without table configuration
83        if les_table_config is None:
84            return None
85
86        try:
87            # count records in chunks as these files are huge
88            table_iterator = les_table_config.read_table(self.dataset_dir, chunk_size=50000000)
89            for _, table in enumerate(table_iterator):
90                les_table_config.num_records += len(table)
91
92            return les_table_config
93        except FileNotFoundError:
94            return None

Process the listening event table.

Returns: the listening event table configuration or None on failure.

def process_matrix( self, matrix_table_config: src.fairreckitlib.data.set.dataset_config.DatasetTableConfig, user_idx_file: str = None, item_idx_file: str = None) -> Optional[src.fairreckitlib.data.set.dataset_config.DatasetMatrixConfig]:
 96    def process_matrix(
 97            self,
 98            matrix_table_config: DatasetTableConfig,
 99            user_idx_file: str=None,
100            item_idx_file: str=None) -> Optional[DatasetMatrixConfig]:
101        """Process the matrix with the specified configuration.
102
103        Args:
104            matrix_table_config: the configuration of the matrix to process.
105            user_idx_file: the file name of the user indices or None when not present.
106            item_idx_file: the file name of the item indices or None when not present.
107
108        Returns:
109            the matrix configuration or None on failure.
110        """
111        user_id = matrix_table_config.primary_key[0]
112        item_id = matrix_table_config.primary_key[1]
113        count_column = matrix_table_config.columns[0]
114
115        unique_users = []
116        unique_items = []
117        rating_min = 1000000000.0
118        rating_max = 0.0
119
120        try:
121            matrix_it = matrix_table_config.read_table(self.dataset_dir, chunk_size=50000000)
122            # process matrix in chunks
123            for _, matrix in enumerate(matrix_it):
124                unique_users = pd.Series(unique_users, dtype='int').append(matrix[user_id]).unique()
125                unique_items = pd.Series(unique_items, dtype='int').append(matrix[item_id]).unique()
126                matrix_table_config.num_records += len(matrix)
127                rating_min = min(rating_min, matrix[count_column].min())
128                rating_max = max(rating_max, matrix[count_column].max())
129        except FileNotFoundError:
130            return None
131
132        return DatasetMatrixConfig(
133            matrix_table_config,
134            RatingMatrixConfig(
135                float(rating_min),
136                float(rating_max),
137                DATASET_RATINGS_IMPLICIT
138            ),
139            DatasetIndexConfig(
140                user_idx_file,
141                user_id,
142                len(unique_users)
143            ),
144            DatasetIndexConfig(
145                item_idx_file,
146                item_id,
147                len(unique_items)
148            )
149        )

Process the matrix with the specified configuration.

Args: matrix_table_config: the configuration of the matrix to process. user_idx_file: the file name of the user indices or None when not present. item_idx_file: the file name of the item indices or None when not present.

Returns: the matrix configuration or None on failure.

def process_user_table( self) -> Optional[src.fairreckitlib.data.set.dataset_config.DatasetTableConfig]:
151    def process_user_table(self) -> Optional[DatasetTableConfig]:
152        """Process the user table.
153
154        Changes the contents of the gender column to be more user-friendly,
155        and the contents of the age column to -1 when above 100.
156
157        Returns:
158            the user table configuration or None on failure.
159        """
160        user_table_config = self.create_user_table_config()
161
162        try:
163            user_table = user_table_config.read_table(self.dataset_dir)
164            user_table_config.num_records = len(user_table)
165        except FileNotFoundError:
166            return None
167
168        # convert gender to more user-friendly names
169        user_table['user_gender'].replace({
170            'm': 'Male',
171            'f': 'Female',
172            'n': 'Neutral'
173        }, inplace=True)
174        # convert age above 100 to -1
175        user_table['user_age'].mask(user_table['user_age'].gt(100), inplace=True)
176        user_table['user_age'].fillna(-1.0, inplace=True)
177        user_table['user_age'] = user_table['user_age'].astype(int)
178
179        # update table configuration
180        user_table_config.file.name = TABLE_FILE_PREFIX + self.dataset_name + '_users.tsv.bz2'
181        user_table_config.file.options.compression = 'bz2'
182        user_table_config.file.options.header = False
183        user_table_config.file.options.sep = None
184
185        # store the resulting user table
186        user_table_config.save_table(user_table, self.dataset_dir)
187
188        return user_table_config

Process the user table.

Changes the contents of the gender column to be more user-friendly, and the contents of the age column to -1 when above 100.

Returns: the user table configuration or None on failure.