src.fairreckitlib.data.set.processor.dataset_processor_lfm2b

This modules contains the class to process the LastFM-2B dataset.

Classes:

DatasetProcessorLFM2B: data processor implementation for the LFM-2B dataset.

This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)

  1"""This modules contains the class to process the LastFM-2B dataset.
  2
  3Classes:
  4
  5    DatasetProcessorLFM2B: data processor implementation for the LFM-2B dataset.
  6
  7This program has been developed by students from the bachelor Computer Science at
  8Utrecht University within the Software Project course.
  9© Copyright Utrecht University (Department of Information and Computing Sciences)
 10"""
 11
 12import os
 13from typing import Callable, List, Optional, Tuple
 14
 15import pandas as pd
 16
 17from ..dataset_config import DatasetMatrixConfig, DatasetTableConfig, create_dataset_table_config
 18from ..dataset_constants import TABLE_FILE_PREFIX
 19from .dataset_processor_lfm import DatasetProcessorLFM
 20
 21
 22class DatasetProcessorLFM2B(DatasetProcessorLFM):
 23    """DatasetProcessor for the LastFM-2B dataset.
 24
 25    The dataset can be downloaded from the website below.
 26    http://www.cp.jku.at/datasets/LFM-2b/
 27
 28    Note that the compressed bz2 files can be used directly.
 29    The processor handles the following files:
 30
 31    albums.tsv.bz2 (optional)
 32    artists.tsv.bz2 (optional)
 33    listening-counts.tsv.bz2 (required)
 34    listening-events.tsv.bz2 (required)
 35    spotify-uris.tsv.bz2 (optional)
 36    tracks.tsv.bz2 (optional)
 37    users.tsv.bz2 (optional)
 38    user_artist_playcount.tsv (required)
 39    """
 40
 41    def create_listening_events_config(self) -> Optional[DatasetTableConfig]:
 42        """Create the listening event table configuration.
 43
 44        Returns:
 45            the configuration of the listening event table.
 46        """
 47        return create_dataset_table_config(
 48            'listening-events.tsv.bz2',
 49            ['user_id', 'track_id', 'album_id'],
 50            ['timestamp'],
 51            compression='bz2',
 52            header=True
 53        )
 54
 55    def create_user_table_config(self) -> DatasetTableConfig:
 56        """Create the user table configuration.
 57
 58        Returns:
 59            the configuration of the user table.
 60        """
 61        return create_dataset_table_config(
 62            'users.tsv.bz2',
 63            ['user_id'],
 64            ['user_country', 'user_age', 'user_gender', 'user_creation time'],
 65            header=True,
 66            compression='bz2'
 67        )
 68
 69    def get_matrix_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetMatrixConfig]]]]:
 70        """Get matrix configuration processors.
 71
 72        Returns:
 73            a list containing the user-artist-count and user-track-count matrix processors.
 74        """
 75        return [
 76            ('user-artist-count', self.process_user_artist_matrix),
 77            ('user-track-count', self.process_user_track_matrix)
 78        ]
 79
 80    def get_table_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]:
 81        """Get table configuration processors.
 82
 83        Returns:
 84            a list containing the album, artist, spotify, track and user table processors.
 85        """
 86        return DatasetProcessorLFM.get_table_configs(self) + [
 87            ('album', self.process_album_table),
 88            ('artist', self.process_artist_table),
 89            ('spotify', self.process_spotify_table),
 90            ('track', self.process_track_table)
 91        ]
 92
 93    def process_album_table(self) -> Optional[DatasetTableConfig]:
 94        r"""Process the album table.
 95
 96        The original file does not load correctly with pandas when splitting on
 97        newlines \n and \t tabs.
 98
 99        Returns:
100            the album table configuration or None on failure.
101        """
102        try:
103            file_name, num_records = self.process_corrupt_table('albums')
104        except FileNotFoundError:
105            return None
106
107        return create_dataset_table_config(
108            file_name,
109            ['album_id'],
110            ['album_name', 'artist_name'],
111            compression='bz2',
112            num_records=num_records
113        )
114
115    def process_artist_table(self) -> Optional[DatasetTableConfig]:
116        """Process the artist table.
117
118        Returns:
119            the artist table configuration or None on failure.
120        """
121        artist_table_config =  create_dataset_table_config(
122            'artists.tsv.bz2',
123            ['artist_id'],
124            ['artist_name'],
125            header=True,
126            compression='bz2'
127        )
128
129        try:
130            artist_table = artist_table_config.read_table(self.dataset_dir)
131            artist_table_config.num_records = len(artist_table)
132            return artist_table_config
133        except FileNotFoundError:
134            return None
135
136    def process_corrupt_table(self, table_name: str) -> Tuple[str, int]:
137        """Process a corrupt table that does not load correctly with pandas.
138
139        Loading with the 'python-fwf' engine does not have issues, however the
140        row values need to be manually split.
141        """
142        table_iterator = pd.read_table(
143            os.path.join(self.dataset_dir, table_name + '.tsv.bz2'),
144            header=0,
145            encoding='utf-8',
146            engine='python-fwf',
147            names=['fwf'],
148            iterator=True,
149            chunksize=1000000
150        )
151
152        file_name = TABLE_FILE_PREFIX + self.dataset_name + '_' + table_name + '.tsv.bz2'
153        file_path = os.path.join(self.dataset_dir, file_name)
154        # remove existing file when present
155        if os.path.isfile(file_path):
156            os.remove(file_path)
157
158        num_records = 0
159        # process in chunks as splitting manually uses a lot of memory
160        for _, dataframe in enumerate(table_iterator):
161            dataframe = dataframe['fwf'].str.split('\t', expand=True)
162            dataframe.to_csv(
163                file_path,
164                mode='a',
165                sep='\t',
166                index=False,
167                header=False,
168                compression='bz2'
169            )
170            num_records += len(dataframe)
171
172        return file_name, num_records
173
174    def process_spotify_table(self) -> Optional[DatasetTableConfig]:
175        """Process the spotify table.
176
177        Returns:
178            the spotify table configuration or None on failure.
179        """
180        spotify_table_config =  create_dataset_table_config(
181            'spotify-uris.tsv.bz2',
182            ['track_id'],
183            ['track_spotify-uri'],
184            header=True,
185            compression='bz2'
186        )
187
188        try:
189            spotify_table = spotify_table_config.read_table(self.dataset_dir)
190            spotify_table_config.num_records = len(spotify_table)
191            return spotify_table_config
192        except FileNotFoundError:
193            return None
194
195    def process_track_table(self) -> Optional[DatasetTableConfig]:
196        r"""Process the track table.
197
198        The original file does not load correctly with pandas when splitting on
199        newlines \n and \t tabs.
200
201        Returns:
202            the track table configuration or None on failure.
203        """
204        try:
205            file_name, num_records = self.process_corrupt_table('tracks')
206        except FileNotFoundError:
207            return None
208
209        return create_dataset_table_config(
210            file_name,
211            ['track_id'],
212            ['artist_name', 'track_name'],
213            compression='bz2',
214            num_records=num_records
215        )
216
217    def process_user_artist_matrix(self) -> Optional[DatasetMatrixConfig]:
218        """Process the user-artist-count matrix.
219
220        Returns:
221            the matrix configuration or None on failure.
222        """
223        return self.process_matrix(create_dataset_table_config(
224            'user_artist_playcount.tsv',
225            ['user_id', 'artist_id'],
226            ['matrix_count'],
227            foreign_keys=['user_id', 'artist_id']
228        ))
229
230    def process_user_track_matrix(self) -> Optional[DatasetMatrixConfig]:
231        """Process the user-track-count matrix.
232
233        Returns:
234            the matrix configuration or None on failure.
235        """
236        return self.process_matrix(create_dataset_table_config(
237            'listening-counts.tsv.bz2',
238            ['user_id', 'track_id'],
239            ['matrix_count'],
240            foreign_keys=['user_id', 'track_id'],
241            compression='bz2',
242            header=True
243        ))
 23class DatasetProcessorLFM2B(DatasetProcessorLFM):
 24    """DatasetProcessor for the LastFM-2B dataset.
 25
 26    The dataset can be downloaded from the website below.
 27    http://www.cp.jku.at/datasets/LFM-2b/
 28
 29    Note that the compressed bz2 files can be used directly.
 30    The processor handles the following files:
 31
 32    albums.tsv.bz2 (optional)
 33    artists.tsv.bz2 (optional)
 34    listening-counts.tsv.bz2 (required)
 35    listening-events.tsv.bz2 (required)
 36    spotify-uris.tsv.bz2 (optional)
 37    tracks.tsv.bz2 (optional)
 38    users.tsv.bz2 (optional)
 39    user_artist_playcount.tsv (required)
 40    """
 41
 42    def create_listening_events_config(self) -> Optional[DatasetTableConfig]:
 43        """Create the listening event table configuration.
 44
 45        Returns:
 46            the configuration of the listening event table.
 47        """
 48        return create_dataset_table_config(
 49            'listening-events.tsv.bz2',
 50            ['user_id', 'track_id', 'album_id'],
 51            ['timestamp'],
 52            compression='bz2',
 53            header=True
 54        )
 55
 56    def create_user_table_config(self) -> DatasetTableConfig:
 57        """Create the user table configuration.
 58
 59        Returns:
 60            the configuration of the user table.
 61        """
 62        return create_dataset_table_config(
 63            'users.tsv.bz2',
 64            ['user_id'],
 65            ['user_country', 'user_age', 'user_gender', 'user_creation time'],
 66            header=True,
 67            compression='bz2'
 68        )
 69
 70    def get_matrix_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetMatrixConfig]]]]:
 71        """Get matrix configuration processors.
 72
 73        Returns:
 74            a list containing the user-artist-count and user-track-count matrix processors.
 75        """
 76        return [
 77            ('user-artist-count', self.process_user_artist_matrix),
 78            ('user-track-count', self.process_user_track_matrix)
 79        ]
 80
 81    def get_table_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]:
 82        """Get table configuration processors.
 83
 84        Returns:
 85            a list containing the album, artist, spotify, track and user table processors.
 86        """
 87        return DatasetProcessorLFM.get_table_configs(self) + [
 88            ('album', self.process_album_table),
 89            ('artist', self.process_artist_table),
 90            ('spotify', self.process_spotify_table),
 91            ('track', self.process_track_table)
 92        ]
 93
 94    def process_album_table(self) -> Optional[DatasetTableConfig]:
 95        r"""Process the album table.
 96
 97        The original file does not load correctly with pandas when splitting on
 98        newlines \n and \t tabs.
 99
100        Returns:
101            the album table configuration or None on failure.
102        """
103        try:
104            file_name, num_records = self.process_corrupt_table('albums')
105        except FileNotFoundError:
106            return None
107
108        return create_dataset_table_config(
109            file_name,
110            ['album_id'],
111            ['album_name', 'artist_name'],
112            compression='bz2',
113            num_records=num_records
114        )
115
116    def process_artist_table(self) -> Optional[DatasetTableConfig]:
117        """Process the artist table.
118
119        Returns:
120            the artist table configuration or None on failure.
121        """
122        artist_table_config =  create_dataset_table_config(
123            'artists.tsv.bz2',
124            ['artist_id'],
125            ['artist_name'],
126            header=True,
127            compression='bz2'
128        )
129
130        try:
131            artist_table = artist_table_config.read_table(self.dataset_dir)
132            artist_table_config.num_records = len(artist_table)
133            return artist_table_config
134        except FileNotFoundError:
135            return None
136
137    def process_corrupt_table(self, table_name: str) -> Tuple[str, int]:
138        """Process a corrupt table that does not load correctly with pandas.
139
140        Loading with the 'python-fwf' engine does not have issues, however the
141        row values need to be manually split.
142        """
143        table_iterator = pd.read_table(
144            os.path.join(self.dataset_dir, table_name + '.tsv.bz2'),
145            header=0,
146            encoding='utf-8',
147            engine='python-fwf',
148            names=['fwf'],
149            iterator=True,
150            chunksize=1000000
151        )
152
153        file_name = TABLE_FILE_PREFIX + self.dataset_name + '_' + table_name + '.tsv.bz2'
154        file_path = os.path.join(self.dataset_dir, file_name)
155        # remove existing file when present
156        if os.path.isfile(file_path):
157            os.remove(file_path)
158
159        num_records = 0
160        # process in chunks as splitting manually uses a lot of memory
161        for _, dataframe in enumerate(table_iterator):
162            dataframe = dataframe['fwf'].str.split('\t', expand=True)
163            dataframe.to_csv(
164                file_path,
165                mode='a',
166                sep='\t',
167                index=False,
168                header=False,
169                compression='bz2'
170            )
171            num_records += len(dataframe)
172
173        return file_name, num_records
174
175    def process_spotify_table(self) -> Optional[DatasetTableConfig]:
176        """Process the spotify table.
177
178        Returns:
179            the spotify table configuration or None on failure.
180        """
181        spotify_table_config =  create_dataset_table_config(
182            'spotify-uris.tsv.bz2',
183            ['track_id'],
184            ['track_spotify-uri'],
185            header=True,
186            compression='bz2'
187        )
188
189        try:
190            spotify_table = spotify_table_config.read_table(self.dataset_dir)
191            spotify_table_config.num_records = len(spotify_table)
192            return spotify_table_config
193        except FileNotFoundError:
194            return None
195
196    def process_track_table(self) -> Optional[DatasetTableConfig]:
197        r"""Process the track table.
198
199        The original file does not load correctly with pandas when splitting on
200        newlines \n and \t tabs.
201
202        Returns:
203            the track table configuration or None on failure.
204        """
205        try:
206            file_name, num_records = self.process_corrupt_table('tracks')
207        except FileNotFoundError:
208            return None
209
210        return create_dataset_table_config(
211            file_name,
212            ['track_id'],
213            ['artist_name', 'track_name'],
214            compression='bz2',
215            num_records=num_records
216        )
217
218    def process_user_artist_matrix(self) -> Optional[DatasetMatrixConfig]:
219        """Process the user-artist-count matrix.
220
221        Returns:
222            the matrix configuration or None on failure.
223        """
224        return self.process_matrix(create_dataset_table_config(
225            'user_artist_playcount.tsv',
226            ['user_id', 'artist_id'],
227            ['matrix_count'],
228            foreign_keys=['user_id', 'artist_id']
229        ))
230
231    def process_user_track_matrix(self) -> Optional[DatasetMatrixConfig]:
232        """Process the user-track-count matrix.
233
234        Returns:
235            the matrix configuration or None on failure.
236        """
237        return self.process_matrix(create_dataset_table_config(
238            'listening-counts.tsv.bz2',
239            ['user_id', 'track_id'],
240            ['matrix_count'],
241            foreign_keys=['user_id', 'track_id'],
242            compression='bz2',
243            header=True
244        ))

DatasetProcessor for the LastFM-2B dataset.

The dataset can be downloaded from the website below. http://www.cp.jku.at/datasets/LFM-2b/

Note that the compressed bz2 files can be used directly. The processor handles the following files:

albums.tsv.bz2 (optional) artists.tsv.bz2 (optional) listening-counts.tsv.bz2 (required) listening-events.tsv.bz2 (required) spotify-uris.tsv.bz2 (optional) tracks.tsv.bz2 (optional) users.tsv.bz2 (optional) user_artist_playcount.tsv (required)

def create_listening_events_config( self) -> Optional[src.fairreckitlib.data.set.dataset_config.DatasetTableConfig]:
42    def create_listening_events_config(self) -> Optional[DatasetTableConfig]:
43        """Create the listening event table configuration.
44
45        Returns:
46            the configuration of the listening event table.
47        """
48        return create_dataset_table_config(
49            'listening-events.tsv.bz2',
50            ['user_id', 'track_id', 'album_id'],
51            ['timestamp'],
52            compression='bz2',
53            header=True
54        )

Create the listening event table configuration.

Returns: the configuration of the listening event table.

def create_user_table_config(self) -> src.fairreckitlib.data.set.dataset_config.DatasetTableConfig:
56    def create_user_table_config(self) -> DatasetTableConfig:
57        """Create the user table configuration.
58
59        Returns:
60            the configuration of the user table.
61        """
62        return create_dataset_table_config(
63            'users.tsv.bz2',
64            ['user_id'],
65            ['user_country', 'user_age', 'user_gender', 'user_creation time'],
66            header=True,
67            compression='bz2'
68        )

Create the user table configuration.

Returns: the configuration of the user table.

def get_matrix_configs( self) -> List[Tuple[str, Callable[[], Optional[src.fairreckitlib.data.set.dataset_config.DatasetMatrixConfig]]]]:
70    def get_matrix_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetMatrixConfig]]]]:
71        """Get matrix configuration processors.
72
73        Returns:
74            a list containing the user-artist-count and user-track-count matrix processors.
75        """
76        return [
77            ('user-artist-count', self.process_user_artist_matrix),
78            ('user-track-count', self.process_user_track_matrix)
79        ]

Get matrix configuration processors.

Returns: a list containing the user-artist-count and user-track-count matrix processors.

def get_table_configs( self) -> List[Tuple[str, Callable[[], Optional[src.fairreckitlib.data.set.dataset_config.DatasetTableConfig]]]]:
81    def get_table_configs(self) -> List[Tuple[str, Callable[[], Optional[DatasetTableConfig]]]]:
82        """Get table configuration processors.
83
84        Returns:
85            a list containing the album, artist, spotify, track and user table processors.
86        """
87        return DatasetProcessorLFM.get_table_configs(self) + [
88            ('album', self.process_album_table),
89            ('artist', self.process_artist_table),
90            ('spotify', self.process_spotify_table),
91            ('track', self.process_track_table)
92        ]

Get table configuration processors.

Returns: a list containing the album, artist, spotify, track and user table processors.

def process_album_table( self) -> Optional[src.fairreckitlib.data.set.dataset_config.DatasetTableConfig]:
 94    def process_album_table(self) -> Optional[DatasetTableConfig]:
 95        r"""Process the album table.
 96
 97        The original file does not load correctly with pandas when splitting on
 98        newlines \n and \t tabs.
 99
100        Returns:
101            the album table configuration or None on failure.
102        """
103        try:
104            file_name, num_records = self.process_corrupt_table('albums')
105        except FileNotFoundError:
106            return None
107
108        return create_dataset_table_config(
109            file_name,
110            ['album_id'],
111            ['album_name', 'artist_name'],
112            compression='bz2',
113            num_records=num_records
114        )

Process the album table.

The original file does not load correctly with pandas when splitting on newlines \n and \t tabs.

Returns: the album table configuration or None on failure.

def process_artist_table( self) -> Optional[src.fairreckitlib.data.set.dataset_config.DatasetTableConfig]:
116    def process_artist_table(self) -> Optional[DatasetTableConfig]:
117        """Process the artist table.
118
119        Returns:
120            the artist table configuration or None on failure.
121        """
122        artist_table_config =  create_dataset_table_config(
123            'artists.tsv.bz2',
124            ['artist_id'],
125            ['artist_name'],
126            header=True,
127            compression='bz2'
128        )
129
130        try:
131            artist_table = artist_table_config.read_table(self.dataset_dir)
132            artist_table_config.num_records = len(artist_table)
133            return artist_table_config
134        except FileNotFoundError:
135            return None

Process the artist table.

Returns: the artist table configuration or None on failure.

def process_corrupt_table(self, table_name: str) -> Tuple[str, int]:
137    def process_corrupt_table(self, table_name: str) -> Tuple[str, int]:
138        """Process a corrupt table that does not load correctly with pandas.
139
140        Loading with the 'python-fwf' engine does not have issues, however the
141        row values need to be manually split.
142        """
143        table_iterator = pd.read_table(
144            os.path.join(self.dataset_dir, table_name + '.tsv.bz2'),
145            header=0,
146            encoding='utf-8',
147            engine='python-fwf',
148            names=['fwf'],
149            iterator=True,
150            chunksize=1000000
151        )
152
153        file_name = TABLE_FILE_PREFIX + self.dataset_name + '_' + table_name + '.tsv.bz2'
154        file_path = os.path.join(self.dataset_dir, file_name)
155        # remove existing file when present
156        if os.path.isfile(file_path):
157            os.remove(file_path)
158
159        num_records = 0
160        # process in chunks as splitting manually uses a lot of memory
161        for _, dataframe in enumerate(table_iterator):
162            dataframe = dataframe['fwf'].str.split('\t', expand=True)
163            dataframe.to_csv(
164                file_path,
165                mode='a',
166                sep='\t',
167                index=False,
168                header=False,
169                compression='bz2'
170            )
171            num_records += len(dataframe)
172
173        return file_name, num_records

Process a corrupt table that does not load correctly with pandas.

Loading with the 'python-fwf' engine does not have issues, however the row values need to be manually split.

def process_spotify_table( self) -> Optional[src.fairreckitlib.data.set.dataset_config.DatasetTableConfig]:
175    def process_spotify_table(self) -> Optional[DatasetTableConfig]:
176        """Process the spotify table.
177
178        Returns:
179            the spotify table configuration or None on failure.
180        """
181        spotify_table_config =  create_dataset_table_config(
182            'spotify-uris.tsv.bz2',
183            ['track_id'],
184            ['track_spotify-uri'],
185            header=True,
186            compression='bz2'
187        )
188
189        try:
190            spotify_table = spotify_table_config.read_table(self.dataset_dir)
191            spotify_table_config.num_records = len(spotify_table)
192            return spotify_table_config
193        except FileNotFoundError:
194            return None

Process the spotify table.

Returns: the spotify table configuration or None on failure.

def process_track_table( self) -> Optional[src.fairreckitlib.data.set.dataset_config.DatasetTableConfig]:
196    def process_track_table(self) -> Optional[DatasetTableConfig]:
197        r"""Process the track table.
198
199        The original file does not load correctly with pandas when splitting on
200        newlines \n and \t tabs.
201
202        Returns:
203            the track table configuration or None on failure.
204        """
205        try:
206            file_name, num_records = self.process_corrupt_table('tracks')
207        except FileNotFoundError:
208            return None
209
210        return create_dataset_table_config(
211            file_name,
212            ['track_id'],
213            ['artist_name', 'track_name'],
214            compression='bz2',
215            num_records=num_records
216        )

Process the track table.

The original file does not load correctly with pandas when splitting on newlines \n and \t tabs.

Returns: the track table configuration or None on failure.

def process_user_artist_matrix( self) -> Optional[src.fairreckitlib.data.set.dataset_config.DatasetMatrixConfig]:
218    def process_user_artist_matrix(self) -> Optional[DatasetMatrixConfig]:
219        """Process the user-artist-count matrix.
220
221        Returns:
222            the matrix configuration or None on failure.
223        """
224        return self.process_matrix(create_dataset_table_config(
225            'user_artist_playcount.tsv',
226            ['user_id', 'artist_id'],
227            ['matrix_count'],
228            foreign_keys=['user_id', 'artist_id']
229        ))

Process the user-artist-count matrix.

Returns: the matrix configuration or None on failure.

def process_user_track_matrix( self) -> Optional[src.fairreckitlib.data.set.dataset_config.DatasetMatrixConfig]:
231    def process_user_track_matrix(self) -> Optional[DatasetMatrixConfig]:
232        """Process the user-track-count matrix.
233
234        Returns:
235            the matrix configuration or None on failure.
236        """
237        return self.process_matrix(create_dataset_table_config(
238            'listening-counts.tsv.bz2',
239            ['user_id', 'track_id'],
240            ['matrix_count'],
241            foreign_keys=['user_id', 'track_id'],
242            compression='bz2',
243            header=True
244        ))

Process the user-track-count matrix.

Returns: the matrix configuration or None on failure.