src.fairreckitlib.data.pipeline.data_pipeline

This module contains functionality of the complete data pipeline.

Classes:

DataPipeline: class that performs dataset operations in preparation for the model pipeline.

This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)

  1"""This module contains functionality of the complete data pipeline.
  2
  3Classes:
  4
  5    DataPipeline: class that performs dataset operations in preparation for the model pipeline.
  6
  7This program has been developed by students from the bachelor Computer Science at
  8Utrecht University within the Software Project course.
  9© Copyright Utrecht University (Department of Information and Computing Sciences)
 10"""
 11
 12import os
 13import time
 14from typing import Callable, Optional, Tuple
 15
 16import pandas as pd
 17
 18from ...core.config.config_factories import GroupFactory
 19from ...core.events.event_dispatcher import EventDispatcher
 20from ...core.events.event_error import ON_FAILURE_ERROR, ErrorEventArgs
 21from ...core.io.io_create import create_dir
 22from ...core.pipeline.core_pipeline import CorePipeline
 23from ..data_transition import DataTransition
 24from ..filter.filter_config import DataSubsetConfig
 25from ..filter.filter_constants import KEY_DATA_SUBSET
 26from ..filter.filter_event import FilterDataframeEventArgs
 27from ..filter.filter_passes import filter_from_filter_passes
 28from ..ratings.convert_config import ConvertConfig
 29from ..ratings.convert_event import ConvertRatingsEventArgs
 30from ..ratings.rating_converter_factory import KEY_RATING_CONVERTER
 31from ..set.dataset import Dataset
 32# from ..filter.filter_constants import KEY_DATA_FILTERS, deduce_filter_type
 33from ..split.split_config import SplitConfig
 34from ..split.split_constants import KEY_SPLITTING, KEY_SPLIT_TEST_RATIO
 35from ..split.split_event import SplitDataframeEventArgs
 36from .data_config import DataMatrixConfig
 37from .data_event import ON_BEGIN_DATA_PIPELINE, ON_END_DATA_PIPELINE, DatasetEventArgs
 38from .data_event import ON_BEGIN_LOAD_DATASET, ON_END_LOAD_DATASET, DatasetMatrixEventArgs
 39from .data_event import ON_BEGIN_FILTER_DATASET, ON_END_FILTER_DATASET
 40from .data_event import ON_BEGIN_CONVERT_RATINGS, ON_END_CONVERT_RATINGS
 41from .data_event import ON_BEGIN_SPLIT_DATASET, ON_END_SPLIT_DATASET
 42from .data_event import ON_BEGIN_SAVE_SETS, ON_END_SAVE_SETS, SaveSetsEventArgs
 43
 44
 45class DataPipeline(CorePipeline):
 46    """Data Pipeline to prepare a dataset for a transition to the ModelPipeline(s).
 47
 48    The pipeline is intended to be reused multiple times depending on the specified
 49    datasets. This is not limited to using a dataset only once as they are numbered
 50    internally to distinguish them later.
 51    For each dataset the following steps are performed in order:
 52
 53    1) create output directory.
 54    2) load the dataset into a dataframe.
 55    3) filter rows based on 'user'/'item' columns. (optional)
 56    4) convert 'rating' column. (optional)
 57    5) split the dataframe into a train and test set.
 58    6) save the train and test set in the output directory.
 59
 60    Public methods:
 61
 62    run
 63    """
 64
 65    def __init__(self, data_factory: GroupFactory, event_dispatcher: EventDispatcher):
 66        """Construct the DataPipeline.
 67
 68        Args:
 69            data_factory: the factory with available data modifier factories.
 70            event_dispatcher: used to dispatch data/IO events when running the pipeline.
 71        """
 72        CorePipeline.__init__(self, event_dispatcher)
 73        self.split_datasets = {}
 74        self.data_factory = data_factory
 75
 76    def run(self,
 77            output_dir: str,
 78            dataset: Dataset,
 79            data_config: DataMatrixConfig,
 80            is_running: Callable[[], bool]) -> Optional[DataTransition]:
 81        """Run the entire data pipeline from beginning to end.
 82
 83        Args:
 84            output_dir: the path of the directory to store the output.
 85            dataset: the dataset to run the pipeline on.
 86            data_config: the dataset matrix configurations.
 87            is_running: function that returns whether the pipeline
 88                is still running. Stops early when False is returned.
 89
 90        Raises:
 91            FileNotFoundError: when the dataset matrix file does not exist.
 92            IOError: when the specified output directory does not exist.
 93            RuntimeError: when any data modifiers are not found in their respective factories.
 94
 95        Returns:
 96            the data transition output of the pipeline.
 97        """
 98        if not os.path.isdir(output_dir):
 99            raise IOError('Unknown data output directory')
100
101        self.event_dispatcher.dispatch(DatasetEventArgs(
102            ON_BEGIN_DATA_PIPELINE,
103            dataset.get_name()
104        ))
105
106        start = time.time()
107
108        # step 1
109        data_dir = self.create_data_output_dir(output_dir, data_config)
110
111        # step 2
112        dataframe = self.load_from_dataset(dataset, data_config.matrix)
113        if not is_running():
114            return None
115
116        # step 3
117        dataframe = self.filter_rows(output_dir, dataframe, data_config)
118        if not is_running():
119            return None
120
121        # step 4
122        dataframe = self.convert_ratings(dataset,
123                                         data_config.matrix,
124                                         dataframe,
125                                         data_config.converter)
126        if not is_running():
127            return None
128
129        # step 5
130        train_set, test_set = self.split(dataframe, data_config.splitting)
131        if not is_running():
132            return None
133
134        # step 6
135        train_set_path, test_set_path = self.save_sets(data_dir, train_set, test_set)
136
137        # update data matrix counter
138        self.split_datasets[data_config.get_data_matrix_name()] += 1
139
140        end = time.time()
141
142        self.event_dispatcher.dispatch(DatasetEventArgs(
143            ON_END_DATA_PIPELINE,
144            dataset.get_name()
145        ), elapsed_time=end - start)
146
147        data_output = DataTransition(
148            dataset,
149            data_config.matrix,
150            data_dir,
151            train_set_path,
152            test_set_path,
153            (dataframe['rating'].min(), dataframe['rating'].max())
154        )
155
156        return data_output
157
158    def create_data_output_dir(self, output_dir: str, data_config: DataMatrixConfig) -> str:
159        """Create the data output directory for a dataset.
160
161        Args:
162            output_dir: the path of the directory to store the output.
163            data_config: the dataset matrix configuration to create a directory for.
164
165        Returns:
166            the path of the directory where the output data can be stored.
167        """
168        dataset_matrix_name = data_config.get_data_matrix_name()
169        if not self.split_datasets.get(dataset_matrix_name):
170            self.split_datasets[dataset_matrix_name] = 0
171
172        index = self.split_datasets[dataset_matrix_name]
173
174        data_dir = os.path.join(output_dir, dataset_matrix_name + '_' + str(index))
175        return create_dir(data_dir, self.event_dispatcher)
176
177    def load_from_dataset(self, dataset: Dataset, matrix_name: str) -> pd.DataFrame:
178        """Load in the desired dataset matrix into a dataframe.
179
180        The loaded dataframe contains at least three columns 'user', 'item', 'rating'.
181        In addition, the 'timestamp' column can be present when available in the specified dataset.
182
183        Args:
184            dataset: the dataset to load a matrix dataframe from.
185            matrix_name: the name of the matrix to load from the dataset.
186
187        Raises:
188            FileNotFoundError: when the dataset matrix file does not exist.
189
190        Returns:
191            the dataframe belonging to the specified dataset.
192        """
193        self.event_dispatcher.dispatch(DatasetMatrixEventArgs(
194            ON_BEGIN_LOAD_DATASET,
195            dataset.get_name(),
196            matrix_name,
197            dataset.get_matrix_file_path(matrix_name)
198        ))
199
200        start = time.time()
201
202        try:
203            dataframe = dataset.load_matrix(matrix_name)
204        except FileNotFoundError as err:
205            self.event_dispatcher.dispatch(ErrorEventArgs(
206                ON_FAILURE_ERROR,
207                'Failure: to load dataset matrix ' + dataset.get_name() + '_' + matrix_name
208            ))
209            # raise again so the data run aborts
210            raise err
211
212        end = time.time()
213
214        self.event_dispatcher.dispatch(DatasetMatrixEventArgs(
215            ON_END_LOAD_DATASET,
216            dataset.get_name(),
217            matrix_name,
218            dataset.get_matrix_file_path(matrix_name)
219        ), elapsed_time=end - start)
220
221        return dataframe
222
223    def filter_rows(self,
224                    output_dir: str,
225                    dataframe: pd.DataFrame,
226                    subset: DataSubsetConfig) -> pd.DataFrame:
227        """Apply the specified subset filters to the dataframe.
228
229        The subset is created by applying multiple filter passes to the dataframe individually.
230        These filter passes are then combined to form the resulting dataframe.
231
232        Args:
233            dataframe: the dataframe to filter with at least two columns: 'user', 'item'.
234            subset: the subset to create of to the dataframe.
235
236        Returns:
237            the dataframe with the specified subgroup filters applied to it.
238        """
239        # early exit, because no filtering is needed
240        if len(subset.filter_passes) == 0:
241            return dataframe
242
243        self.event_dispatcher.dispatch(FilterDataframeEventArgs(
244            ON_BEGIN_FILTER_DATASET,
245            subset
246        ))
247
248        start = time.time()
249        filter_factory = self.data_factory.get_factory(KEY_DATA_SUBSET)
250        dataframe = filter_from_filter_passes(self, output_dir, dataframe, subset, filter_factory)
251        end = time.time()
252
253        self.event_dispatcher.dispatch(FilterDataframeEventArgs(
254            ON_END_FILTER_DATASET,
255            subset
256        ), elapsed_time=end - start)
257
258        return dataframe
259
260    def convert_ratings(self,
261                        dataset: Dataset,
262                        matrix_name: str,
263                        dataframe: pd.DataFrame,
264                        convert_config: ConvertConfig) -> pd.DataFrame:
265        """Convert the ratings in the dataframe with the specified rating modifier.
266
267        Args:
268            dataset: the dataset to load the matrix and rating_type from.
269            matrix_name: the name of the dataset matrix.
270            dataframe: the dataframe to convert the ratings of.
271                At the least a 'rating' column is expected to be present.
272            convert_config: the configuration of the converter to apply to the 'rating' column.
273
274        Raises:
275            RuntimeError: when the converter specified by the configuration is not available.
276
277        Returns:
278            the converted dataframe or the input dataframe when no converter is specified.
279        """
280        if convert_config is None:
281            return dataframe
282
283        self.event_dispatcher.dispatch(ConvertRatingsEventArgs(
284            ON_BEGIN_CONVERT_RATINGS,
285            convert_config
286        ))
287
288        start = time.time()
289
290        converter_factory = self.data_factory.get_factory(KEY_RATING_CONVERTER)
291        dataset_converter_factory = converter_factory.get_factory(dataset.get_name())
292        matrix_converter_factory = dataset_converter_factory.get_factory(matrix_name)
293
294        converter = matrix_converter_factory.create(convert_config.name, convert_config.params)
295        if converter is None:
296            self.event_dispatcher.dispatch(ErrorEventArgs(
297                ON_FAILURE_ERROR,
298                'Failure: to get converter from factory: ' + convert_config.name
299            ))
300            # raise error so the data run aborts
301            raise RuntimeError()
302
303        dataframe = converter.run(dataframe)
304
305        end = time.time()
306
307        self.event_dispatcher.dispatch(ConvertRatingsEventArgs(
308            ON_END_CONVERT_RATINGS,
309            convert_config
310        ), elapsed_time=end - start)
311
312        return dataframe
313
314    def split(self,
315              dataframe: pd.DataFrame,
316              split_config: SplitConfig) -> Tuple[pd.DataFrame, pd.DataFrame]:
317        """Split the dataframe into a train and test set.
318
319        This will be split 80/20 (or a similar ratio), and be done either random, or timestamp-wise.
320        The dataframe is expected to have at least three columns: 'user', 'item', 'rating'.
321        In addition, the 'timestamp' column is required for temporal splits.
322
323        Args:
324            dataframe: the dataframe to split into a train and test set.
325            split_config: the dataset splitting configuration.
326
327        Raises:
328            RuntimeError: when the splitter specified by the configuration is not available.
329
330        Returns:
331            the train and test set split of the specified dataframe.
332        """
333        self.event_dispatcher.dispatch(SplitDataframeEventArgs(
334            ON_BEGIN_SPLIT_DATASET,
335            split_config
336        ))
337
338        start = time.time()
339        split_kwargs = {KEY_SPLIT_TEST_RATIO: split_config.test_ratio}
340        split_factory = self.data_factory.get_factory(KEY_SPLITTING)
341        splitter = split_factory.create(split_config.name, split_config.params, **split_kwargs)
342        if splitter is None:
343            self.event_dispatcher.dispatch(ErrorEventArgs(
344                ON_FAILURE_ERROR,
345                'Failure: to get splitter from factory: ' + split_config.name
346            ))
347            # raise error so the data run aborts
348            raise RuntimeError()
349
350        train_set, test_set = splitter.run(dataframe)
351        end = time.time()
352
353        self.event_dispatcher.dispatch(SplitDataframeEventArgs(
354            ON_END_SPLIT_DATASET,
355            split_config
356        ), elapsed_time=end - start)
357
358        return train_set, test_set
359
360    def save_sets(self,
361                  output_dir: str,
362                  train_set: pd.DataFrame,
363                  test_set: pd.DataFrame) -> Tuple[str, str]:
364        """Save the train and test sets to the desired output directory.
365
366        Args:
367            output_dir: the path of the directory to store both sets.
368            train_set: the train set to save with at least three columns: 'user', 'item', 'rating'.
369            test_set: the test set to save with at least three columns: 'user', 'item', 'rating'.
370
371        Returns:
372            the paths where the train and test set are stored.
373        """
374        headers_to_save = ['user', 'item', 'rating']
375
376        train_set = train_set[headers_to_save]
377        test_set = test_set[headers_to_save]
378
379        train_set_path = os.path.join(output_dir, 'train_set.tsv')
380        test_set_path = os.path.join(output_dir, 'test_set.tsv')
381
382        self.event_dispatcher.dispatch(SaveSetsEventArgs(
383            ON_BEGIN_SAVE_SETS,
384            train_set_path,
385            test_set_path
386        ))
387
388        start = time.time()
389        train_set.to_csv(train_set_path, sep='\t', header=False, index=False)
390        test_set.to_csv(test_set_path, sep='\t', header=False, index=False)
391        end = time.time()
392
393        self.event_dispatcher.dispatch(SaveSetsEventArgs(
394            ON_END_SAVE_SETS,
395            train_set_path,
396            test_set_path
397        ), elapsed_time=end - start)
398
399        return train_set_path, test_set_path
 46class DataPipeline(CorePipeline):
 47    """Data Pipeline to prepare a dataset for a transition to the ModelPipeline(s).
 48
 49    The pipeline is intended to be reused multiple times depending on the specified
 50    datasets. This is not limited to using a dataset only once as they are numbered
 51    internally to distinguish them later.
 52    For each dataset the following steps are performed in order:
 53
 54    1) create output directory.
 55    2) load the dataset into a dataframe.
 56    3) filter rows based on 'user'/'item' columns. (optional)
 57    4) convert 'rating' column. (optional)
 58    5) split the dataframe into a train and test set.
 59    6) save the train and test set in the output directory.
 60
 61    Public methods:
 62
 63    run
 64    """
 65
 66    def __init__(self, data_factory: GroupFactory, event_dispatcher: EventDispatcher):
 67        """Construct the DataPipeline.
 68
 69        Args:
 70            data_factory: the factory with available data modifier factories.
 71            event_dispatcher: used to dispatch data/IO events when running the pipeline.
 72        """
 73        CorePipeline.__init__(self, event_dispatcher)
 74        self.split_datasets = {}
 75        self.data_factory = data_factory
 76
 77    def run(self,
 78            output_dir: str,
 79            dataset: Dataset,
 80            data_config: DataMatrixConfig,
 81            is_running: Callable[[], bool]) -> Optional[DataTransition]:
 82        """Run the entire data pipeline from beginning to end.
 83
 84        Args:
 85            output_dir: the path of the directory to store the output.
 86            dataset: the dataset to run the pipeline on.
 87            data_config: the dataset matrix configurations.
 88            is_running: function that returns whether the pipeline
 89                is still running. Stops early when False is returned.
 90
 91        Raises:
 92            FileNotFoundError: when the dataset matrix file does not exist.
 93            IOError: when the specified output directory does not exist.
 94            RuntimeError: when any data modifiers are not found in their respective factories.
 95
 96        Returns:
 97            the data transition output of the pipeline.
 98        """
 99        if not os.path.isdir(output_dir):
100            raise IOError('Unknown data output directory')
101
102        self.event_dispatcher.dispatch(DatasetEventArgs(
103            ON_BEGIN_DATA_PIPELINE,
104            dataset.get_name()
105        ))
106
107        start = time.time()
108
109        # step 1
110        data_dir = self.create_data_output_dir(output_dir, data_config)
111
112        # step 2
113        dataframe = self.load_from_dataset(dataset, data_config.matrix)
114        if not is_running():
115            return None
116
117        # step 3
118        dataframe = self.filter_rows(output_dir, dataframe, data_config)
119        if not is_running():
120            return None
121
122        # step 4
123        dataframe = self.convert_ratings(dataset,
124                                         data_config.matrix,
125                                         dataframe,
126                                         data_config.converter)
127        if not is_running():
128            return None
129
130        # step 5
131        train_set, test_set = self.split(dataframe, data_config.splitting)
132        if not is_running():
133            return None
134
135        # step 6
136        train_set_path, test_set_path = self.save_sets(data_dir, train_set, test_set)
137
138        # update data matrix counter
139        self.split_datasets[data_config.get_data_matrix_name()] += 1
140
141        end = time.time()
142
143        self.event_dispatcher.dispatch(DatasetEventArgs(
144            ON_END_DATA_PIPELINE,
145            dataset.get_name()
146        ), elapsed_time=end - start)
147
148        data_output = DataTransition(
149            dataset,
150            data_config.matrix,
151            data_dir,
152            train_set_path,
153            test_set_path,
154            (dataframe['rating'].min(), dataframe['rating'].max())
155        )
156
157        return data_output
158
159    def create_data_output_dir(self, output_dir: str, data_config: DataMatrixConfig) -> str:
160        """Create the data output directory for a dataset.
161
162        Args:
163            output_dir: the path of the directory to store the output.
164            data_config: the dataset matrix configuration to create a directory for.
165
166        Returns:
167            the path of the directory where the output data can be stored.
168        """
169        dataset_matrix_name = data_config.get_data_matrix_name()
170        if not self.split_datasets.get(dataset_matrix_name):
171            self.split_datasets[dataset_matrix_name] = 0
172
173        index = self.split_datasets[dataset_matrix_name]
174
175        data_dir = os.path.join(output_dir, dataset_matrix_name + '_' + str(index))
176        return create_dir(data_dir, self.event_dispatcher)
177
178    def load_from_dataset(self, dataset: Dataset, matrix_name: str) -> pd.DataFrame:
179        """Load in the desired dataset matrix into a dataframe.
180
181        The loaded dataframe contains at least three columns 'user', 'item', 'rating'.
182        In addition, the 'timestamp' column can be present when available in the specified dataset.
183
184        Args:
185            dataset: the dataset to load a matrix dataframe from.
186            matrix_name: the name of the matrix to load from the dataset.
187
188        Raises:
189            FileNotFoundError: when the dataset matrix file does not exist.
190
191        Returns:
192            the dataframe belonging to the specified dataset.
193        """
194        self.event_dispatcher.dispatch(DatasetMatrixEventArgs(
195            ON_BEGIN_LOAD_DATASET,
196            dataset.get_name(),
197            matrix_name,
198            dataset.get_matrix_file_path(matrix_name)
199        ))
200
201        start = time.time()
202
203        try:
204            dataframe = dataset.load_matrix(matrix_name)
205        except FileNotFoundError as err:
206            self.event_dispatcher.dispatch(ErrorEventArgs(
207                ON_FAILURE_ERROR,
208                'Failure: to load dataset matrix ' + dataset.get_name() + '_' + matrix_name
209            ))
210            # raise again so the data run aborts
211            raise err
212
213        end = time.time()
214
215        self.event_dispatcher.dispatch(DatasetMatrixEventArgs(
216            ON_END_LOAD_DATASET,
217            dataset.get_name(),
218            matrix_name,
219            dataset.get_matrix_file_path(matrix_name)
220        ), elapsed_time=end - start)
221
222        return dataframe
223
224    def filter_rows(self,
225                    output_dir: str,
226                    dataframe: pd.DataFrame,
227                    subset: DataSubsetConfig) -> pd.DataFrame:
228        """Apply the specified subset filters to the dataframe.
229
230        The subset is created by applying multiple filter passes to the dataframe individually.
231        These filter passes are then combined to form the resulting dataframe.
232
233        Args:
234            dataframe: the dataframe to filter with at least two columns: 'user', 'item'.
235            subset: the subset to create of to the dataframe.
236
237        Returns:
238            the dataframe with the specified subgroup filters applied to it.
239        """
240        # early exit, because no filtering is needed
241        if len(subset.filter_passes) == 0:
242            return dataframe
243
244        self.event_dispatcher.dispatch(FilterDataframeEventArgs(
245            ON_BEGIN_FILTER_DATASET,
246            subset
247        ))
248
249        start = time.time()
250        filter_factory = self.data_factory.get_factory(KEY_DATA_SUBSET)
251        dataframe = filter_from_filter_passes(self, output_dir, dataframe, subset, filter_factory)
252        end = time.time()
253
254        self.event_dispatcher.dispatch(FilterDataframeEventArgs(
255            ON_END_FILTER_DATASET,
256            subset
257        ), elapsed_time=end - start)
258
259        return dataframe
260
261    def convert_ratings(self,
262                        dataset: Dataset,
263                        matrix_name: str,
264                        dataframe: pd.DataFrame,
265                        convert_config: ConvertConfig) -> pd.DataFrame:
266        """Convert the ratings in the dataframe with the specified rating modifier.
267
268        Args:
269            dataset: the dataset to load the matrix and rating_type from.
270            matrix_name: the name of the dataset matrix.
271            dataframe: the dataframe to convert the ratings of.
272                At the least a 'rating' column is expected to be present.
273            convert_config: the configuration of the converter to apply to the 'rating' column.
274
275        Raises:
276            RuntimeError: when the converter specified by the configuration is not available.
277
278        Returns:
279            the converted dataframe or the input dataframe when no converter is specified.
280        """
281        if convert_config is None:
282            return dataframe
283
284        self.event_dispatcher.dispatch(ConvertRatingsEventArgs(
285            ON_BEGIN_CONVERT_RATINGS,
286            convert_config
287        ))
288
289        start = time.time()
290
291        converter_factory = self.data_factory.get_factory(KEY_RATING_CONVERTER)
292        dataset_converter_factory = converter_factory.get_factory(dataset.get_name())
293        matrix_converter_factory = dataset_converter_factory.get_factory(matrix_name)
294
295        converter = matrix_converter_factory.create(convert_config.name, convert_config.params)
296        if converter is None:
297            self.event_dispatcher.dispatch(ErrorEventArgs(
298                ON_FAILURE_ERROR,
299                'Failure: to get converter from factory: ' + convert_config.name
300            ))
301            # raise error so the data run aborts
302            raise RuntimeError()
303
304        dataframe = converter.run(dataframe)
305
306        end = time.time()
307
308        self.event_dispatcher.dispatch(ConvertRatingsEventArgs(
309            ON_END_CONVERT_RATINGS,
310            convert_config
311        ), elapsed_time=end - start)
312
313        return dataframe
314
315    def split(self,
316              dataframe: pd.DataFrame,
317              split_config: SplitConfig) -> Tuple[pd.DataFrame, pd.DataFrame]:
318        """Split the dataframe into a train and test set.
319
320        This will be split 80/20 (or a similar ratio), and be done either random, or timestamp-wise.
321        The dataframe is expected to have at least three columns: 'user', 'item', 'rating'.
322        In addition, the 'timestamp' column is required for temporal splits.
323
324        Args:
325            dataframe: the dataframe to split into a train and test set.
326            split_config: the dataset splitting configuration.
327
328        Raises:
329            RuntimeError: when the splitter specified by the configuration is not available.
330
331        Returns:
332            the train and test set split of the specified dataframe.
333        """
334        self.event_dispatcher.dispatch(SplitDataframeEventArgs(
335            ON_BEGIN_SPLIT_DATASET,
336            split_config
337        ))
338
339        start = time.time()
340        split_kwargs = {KEY_SPLIT_TEST_RATIO: split_config.test_ratio}
341        split_factory = self.data_factory.get_factory(KEY_SPLITTING)
342        splitter = split_factory.create(split_config.name, split_config.params, **split_kwargs)
343        if splitter is None:
344            self.event_dispatcher.dispatch(ErrorEventArgs(
345                ON_FAILURE_ERROR,
346                'Failure: to get splitter from factory: ' + split_config.name
347            ))
348            # raise error so the data run aborts
349            raise RuntimeError()
350
351        train_set, test_set = splitter.run(dataframe)
352        end = time.time()
353
354        self.event_dispatcher.dispatch(SplitDataframeEventArgs(
355            ON_END_SPLIT_DATASET,
356            split_config
357        ), elapsed_time=end - start)
358
359        return train_set, test_set
360
361    def save_sets(self,
362                  output_dir: str,
363                  train_set: pd.DataFrame,
364                  test_set: pd.DataFrame) -> Tuple[str, str]:
365        """Save the train and test sets to the desired output directory.
366
367        Args:
368            output_dir: the path of the directory to store both sets.
369            train_set: the train set to save with at least three columns: 'user', 'item', 'rating'.
370            test_set: the test set to save with at least three columns: 'user', 'item', 'rating'.
371
372        Returns:
373            the paths where the train and test set are stored.
374        """
375        headers_to_save = ['user', 'item', 'rating']
376
377        train_set = train_set[headers_to_save]
378        test_set = test_set[headers_to_save]
379
380        train_set_path = os.path.join(output_dir, 'train_set.tsv')
381        test_set_path = os.path.join(output_dir, 'test_set.tsv')
382
383        self.event_dispatcher.dispatch(SaveSetsEventArgs(
384            ON_BEGIN_SAVE_SETS,
385            train_set_path,
386            test_set_path
387        ))
388
389        start = time.time()
390        train_set.to_csv(train_set_path, sep='\t', header=False, index=False)
391        test_set.to_csv(test_set_path, sep='\t', header=False, index=False)
392        end = time.time()
393
394        self.event_dispatcher.dispatch(SaveSetsEventArgs(
395            ON_END_SAVE_SETS,
396            train_set_path,
397            test_set_path
398        ), elapsed_time=end - start)
399
400        return train_set_path, test_set_path

Data Pipeline to prepare a dataset for a transition to the ModelPipeline(s).

The pipeline is intended to be reused multiple times depending on the specified datasets. This is not limited to using a dataset only once as they are numbered internally to distinguish them later. For each dataset the following steps are performed in order:

1) create output directory. 2) load the dataset into a dataframe. 3) filter rows based on 'user'/'item' columns. (optional) 4) convert 'rating' column. (optional) 5) split the dataframe into a train and test set. 6) save the train and test set in the output directory.

Public methods:

run

DataPipeline( data_factory: src.fairreckitlib.core.config.config_factories.GroupFactory, event_dispatcher: src.fairreckitlib.core.events.event_dispatcher.EventDispatcher)
66    def __init__(self, data_factory: GroupFactory, event_dispatcher: EventDispatcher):
67        """Construct the DataPipeline.
68
69        Args:
70            data_factory: the factory with available data modifier factories.
71            event_dispatcher: used to dispatch data/IO events when running the pipeline.
72        """
73        CorePipeline.__init__(self, event_dispatcher)
74        self.split_datasets = {}
75        self.data_factory = data_factory

Construct the DataPipeline.

Args: data_factory: the factory with available data modifier factories. event_dispatcher: used to dispatch data/IO events when running the pipeline.

def run( self, output_dir: str, dataset: src.fairreckitlib.data.set.dataset.Dataset, data_config: src.fairreckitlib.data.pipeline.data_config.DataMatrixConfig, is_running: Callable[[], bool]) -> Optional[src.fairreckitlib.data.data_transition.DataTransition]:
 77    def run(self,
 78            output_dir: str,
 79            dataset: Dataset,
 80            data_config: DataMatrixConfig,
 81            is_running: Callable[[], bool]) -> Optional[DataTransition]:
 82        """Run the entire data pipeline from beginning to end.
 83
 84        Args:
 85            output_dir: the path of the directory to store the output.
 86            dataset: the dataset to run the pipeline on.
 87            data_config: the dataset matrix configurations.
 88            is_running: function that returns whether the pipeline
 89                is still running. Stops early when False is returned.
 90
 91        Raises:
 92            FileNotFoundError: when the dataset matrix file does not exist.
 93            IOError: when the specified output directory does not exist.
 94            RuntimeError: when any data modifiers are not found in their respective factories.
 95
 96        Returns:
 97            the data transition output of the pipeline.
 98        """
 99        if not os.path.isdir(output_dir):
100            raise IOError('Unknown data output directory')
101
102        self.event_dispatcher.dispatch(DatasetEventArgs(
103            ON_BEGIN_DATA_PIPELINE,
104            dataset.get_name()
105        ))
106
107        start = time.time()
108
109        # step 1
110        data_dir = self.create_data_output_dir(output_dir, data_config)
111
112        # step 2
113        dataframe = self.load_from_dataset(dataset, data_config.matrix)
114        if not is_running():
115            return None
116
117        # step 3
118        dataframe = self.filter_rows(output_dir, dataframe, data_config)
119        if not is_running():
120            return None
121
122        # step 4
123        dataframe = self.convert_ratings(dataset,
124                                         data_config.matrix,
125                                         dataframe,
126                                         data_config.converter)
127        if not is_running():
128            return None
129
130        # step 5
131        train_set, test_set = self.split(dataframe, data_config.splitting)
132        if not is_running():
133            return None
134
135        # step 6
136        train_set_path, test_set_path = self.save_sets(data_dir, train_set, test_set)
137
138        # update data matrix counter
139        self.split_datasets[data_config.get_data_matrix_name()] += 1
140
141        end = time.time()
142
143        self.event_dispatcher.dispatch(DatasetEventArgs(
144            ON_END_DATA_PIPELINE,
145            dataset.get_name()
146        ), elapsed_time=end - start)
147
148        data_output = DataTransition(
149            dataset,
150            data_config.matrix,
151            data_dir,
152            train_set_path,
153            test_set_path,
154            (dataframe['rating'].min(), dataframe['rating'].max())
155        )
156
157        return data_output

Run the entire data pipeline from beginning to end.

Args: output_dir: the path of the directory to store the output. dataset: the dataset to run the pipeline on. data_config: the dataset matrix configurations. is_running: function that returns whether the pipeline is still running. Stops early when False is returned.

Raises: FileNotFoundError: when the dataset matrix file does not exist. IOError: when the specified output directory does not exist. RuntimeError: when any data modifiers are not found in their respective factories.

Returns: the data transition output of the pipeline.

def create_data_output_dir( self, output_dir: str, data_config: src.fairreckitlib.data.pipeline.data_config.DataMatrixConfig) -> str:
159    def create_data_output_dir(self, output_dir: str, data_config: DataMatrixConfig) -> str:
160        """Create the data output directory for a dataset.
161
162        Args:
163            output_dir: the path of the directory to store the output.
164            data_config: the dataset matrix configuration to create a directory for.
165
166        Returns:
167            the path of the directory where the output data can be stored.
168        """
169        dataset_matrix_name = data_config.get_data_matrix_name()
170        if not self.split_datasets.get(dataset_matrix_name):
171            self.split_datasets[dataset_matrix_name] = 0
172
173        index = self.split_datasets[dataset_matrix_name]
174
175        data_dir = os.path.join(output_dir, dataset_matrix_name + '_' + str(index))
176        return create_dir(data_dir, self.event_dispatcher)

Create the data output directory for a dataset.

Args: output_dir: the path of the directory to store the output. data_config: the dataset matrix configuration to create a directory for.

Returns: the path of the directory where the output data can be stored.

def load_from_dataset( self, dataset: src.fairreckitlib.data.set.dataset.Dataset, matrix_name: str) -> pandas.core.frame.DataFrame:
178    def load_from_dataset(self, dataset: Dataset, matrix_name: str) -> pd.DataFrame:
179        """Load in the desired dataset matrix into a dataframe.
180
181        The loaded dataframe contains at least three columns 'user', 'item', 'rating'.
182        In addition, the 'timestamp' column can be present when available in the specified dataset.
183
184        Args:
185            dataset: the dataset to load a matrix dataframe from.
186            matrix_name: the name of the matrix to load from the dataset.
187
188        Raises:
189            FileNotFoundError: when the dataset matrix file does not exist.
190
191        Returns:
192            the dataframe belonging to the specified dataset.
193        """
194        self.event_dispatcher.dispatch(DatasetMatrixEventArgs(
195            ON_BEGIN_LOAD_DATASET,
196            dataset.get_name(),
197            matrix_name,
198            dataset.get_matrix_file_path(matrix_name)
199        ))
200
201        start = time.time()
202
203        try:
204            dataframe = dataset.load_matrix(matrix_name)
205        except FileNotFoundError as err:
206            self.event_dispatcher.dispatch(ErrorEventArgs(
207                ON_FAILURE_ERROR,
208                'Failure: to load dataset matrix ' + dataset.get_name() + '_' + matrix_name
209            ))
210            # raise again so the data run aborts
211            raise err
212
213        end = time.time()
214
215        self.event_dispatcher.dispatch(DatasetMatrixEventArgs(
216            ON_END_LOAD_DATASET,
217            dataset.get_name(),
218            matrix_name,
219            dataset.get_matrix_file_path(matrix_name)
220        ), elapsed_time=end - start)
221
222        return dataframe

Load in the desired dataset matrix into a dataframe.

The loaded dataframe contains at least three columns 'user', 'item', 'rating'. In addition, the 'timestamp' column can be present when available in the specified dataset.

Args: dataset: the dataset to load a matrix dataframe from. matrix_name: the name of the matrix to load from the dataset.

Raises: FileNotFoundError: when the dataset matrix file does not exist.

Returns: the dataframe belonging to the specified dataset.

def filter_rows( self, output_dir: str, dataframe: pandas.core.frame.DataFrame, subset: src.fairreckitlib.data.filter.filter_config.DataSubsetConfig) -> pandas.core.frame.DataFrame:
224    def filter_rows(self,
225                    output_dir: str,
226                    dataframe: pd.DataFrame,
227                    subset: DataSubsetConfig) -> pd.DataFrame:
228        """Apply the specified subset filters to the dataframe.
229
230        The subset is created by applying multiple filter passes to the dataframe individually.
231        These filter passes are then combined to form the resulting dataframe.
232
233        Args:
234            dataframe: the dataframe to filter with at least two columns: 'user', 'item'.
235            subset: the subset to create of to the dataframe.
236
237        Returns:
238            the dataframe with the specified subgroup filters applied to it.
239        """
240        # early exit, because no filtering is needed
241        if len(subset.filter_passes) == 0:
242            return dataframe
243
244        self.event_dispatcher.dispatch(FilterDataframeEventArgs(
245            ON_BEGIN_FILTER_DATASET,
246            subset
247        ))
248
249        start = time.time()
250        filter_factory = self.data_factory.get_factory(KEY_DATA_SUBSET)
251        dataframe = filter_from_filter_passes(self, output_dir, dataframe, subset, filter_factory)
252        end = time.time()
253
254        self.event_dispatcher.dispatch(FilterDataframeEventArgs(
255            ON_END_FILTER_DATASET,
256            subset
257        ), elapsed_time=end - start)
258
259        return dataframe

Apply the specified subset filters to the dataframe.

The subset is created by applying multiple filter passes to the dataframe individually. These filter passes are then combined to form the resulting dataframe.

Args: dataframe: the dataframe to filter with at least two columns: 'user', 'item'. subset: the subset to create of to the dataframe.

Returns: the dataframe with the specified subgroup filters applied to it.

def convert_ratings( self, dataset: src.fairreckitlib.data.set.dataset.Dataset, matrix_name: str, dataframe: pandas.core.frame.DataFrame, convert_config: src.fairreckitlib.data.ratings.convert_config.ConvertConfig) -> pandas.core.frame.DataFrame:
261    def convert_ratings(self,
262                        dataset: Dataset,
263                        matrix_name: str,
264                        dataframe: pd.DataFrame,
265                        convert_config: ConvertConfig) -> pd.DataFrame:
266        """Convert the ratings in the dataframe with the specified rating modifier.
267
268        Args:
269            dataset: the dataset to load the matrix and rating_type from.
270            matrix_name: the name of the dataset matrix.
271            dataframe: the dataframe to convert the ratings of.
272                At the least a 'rating' column is expected to be present.
273            convert_config: the configuration of the converter to apply to the 'rating' column.
274
275        Raises:
276            RuntimeError: when the converter specified by the configuration is not available.
277
278        Returns:
279            the converted dataframe or the input dataframe when no converter is specified.
280        """
281        if convert_config is None:
282            return dataframe
283
284        self.event_dispatcher.dispatch(ConvertRatingsEventArgs(
285            ON_BEGIN_CONVERT_RATINGS,
286            convert_config
287        ))
288
289        start = time.time()
290
291        converter_factory = self.data_factory.get_factory(KEY_RATING_CONVERTER)
292        dataset_converter_factory = converter_factory.get_factory(dataset.get_name())
293        matrix_converter_factory = dataset_converter_factory.get_factory(matrix_name)
294
295        converter = matrix_converter_factory.create(convert_config.name, convert_config.params)
296        if converter is None:
297            self.event_dispatcher.dispatch(ErrorEventArgs(
298                ON_FAILURE_ERROR,
299                'Failure: to get converter from factory: ' + convert_config.name
300            ))
301            # raise error so the data run aborts
302            raise RuntimeError()
303
304        dataframe = converter.run(dataframe)
305
306        end = time.time()
307
308        self.event_dispatcher.dispatch(ConvertRatingsEventArgs(
309            ON_END_CONVERT_RATINGS,
310            convert_config
311        ), elapsed_time=end - start)
312
313        return dataframe

Convert the ratings in the dataframe with the specified rating modifier.

Args: dataset: the dataset to load the matrix and rating_type from. matrix_name: the name of the dataset matrix. dataframe: the dataframe to convert the ratings of. At the least a 'rating' column is expected to be present. convert_config: the configuration of the converter to apply to the 'rating' column.

Raises: RuntimeError: when the converter specified by the configuration is not available.

Returns: the converted dataframe or the input dataframe when no converter is specified.

def split( self, dataframe: pandas.core.frame.DataFrame, split_config: src.fairreckitlib.data.split.split_config.SplitConfig) -> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]:
315    def split(self,
316              dataframe: pd.DataFrame,
317              split_config: SplitConfig) -> Tuple[pd.DataFrame, pd.DataFrame]:
318        """Split the dataframe into a train and test set.
319
320        This will be split 80/20 (or a similar ratio), and be done either random, or timestamp-wise.
321        The dataframe is expected to have at least three columns: 'user', 'item', 'rating'.
322        In addition, the 'timestamp' column is required for temporal splits.
323
324        Args:
325            dataframe: the dataframe to split into a train and test set.
326            split_config: the dataset splitting configuration.
327
328        Raises:
329            RuntimeError: when the splitter specified by the configuration is not available.
330
331        Returns:
332            the train and test set split of the specified dataframe.
333        """
334        self.event_dispatcher.dispatch(SplitDataframeEventArgs(
335            ON_BEGIN_SPLIT_DATASET,
336            split_config
337        ))
338
339        start = time.time()
340        split_kwargs = {KEY_SPLIT_TEST_RATIO: split_config.test_ratio}
341        split_factory = self.data_factory.get_factory(KEY_SPLITTING)
342        splitter = split_factory.create(split_config.name, split_config.params, **split_kwargs)
343        if splitter is None:
344            self.event_dispatcher.dispatch(ErrorEventArgs(
345                ON_FAILURE_ERROR,
346                'Failure: to get splitter from factory: ' + split_config.name
347            ))
348            # raise error so the data run aborts
349            raise RuntimeError()
350
351        train_set, test_set = splitter.run(dataframe)
352        end = time.time()
353
354        self.event_dispatcher.dispatch(SplitDataframeEventArgs(
355            ON_END_SPLIT_DATASET,
356            split_config
357        ), elapsed_time=end - start)
358
359        return train_set, test_set

Split the dataframe into a train and test set.

This will be split 80/20 (or a similar ratio), and be done either random, or timestamp-wise. The dataframe is expected to have at least three columns: 'user', 'item', 'rating'. In addition, the 'timestamp' column is required for temporal splits.

Args: dataframe: the dataframe to split into a train and test set. split_config: the dataset splitting configuration.

Raises: RuntimeError: when the splitter specified by the configuration is not available.

Returns: the train and test set split of the specified dataframe.

def save_sets( self, output_dir: str, train_set: pandas.core.frame.DataFrame, test_set: pandas.core.frame.DataFrame) -> Tuple[str, str]:
361    def save_sets(self,
362                  output_dir: str,
363                  train_set: pd.DataFrame,
364                  test_set: pd.DataFrame) -> Tuple[str, str]:
365        """Save the train and test sets to the desired output directory.
366
367        Args:
368            output_dir: the path of the directory to store both sets.
369            train_set: the train set to save with at least three columns: 'user', 'item', 'rating'.
370            test_set: the test set to save with at least three columns: 'user', 'item', 'rating'.
371
372        Returns:
373            the paths where the train and test set are stored.
374        """
375        headers_to_save = ['user', 'item', 'rating']
376
377        train_set = train_set[headers_to_save]
378        test_set = test_set[headers_to_save]
379
380        train_set_path = os.path.join(output_dir, 'train_set.tsv')
381        test_set_path = os.path.join(output_dir, 'test_set.tsv')
382
383        self.event_dispatcher.dispatch(SaveSetsEventArgs(
384            ON_BEGIN_SAVE_SETS,
385            train_set_path,
386            test_set_path
387        ))
388
389        start = time.time()
390        train_set.to_csv(train_set_path, sep='\t', header=False, index=False)
391        test_set.to_csv(test_set_path, sep='\t', header=False, index=False)
392        end = time.time()
393
394        self.event_dispatcher.dispatch(SaveSetsEventArgs(
395            ON_END_SAVE_SETS,
396            train_set_path,
397            test_set_path
398        ), elapsed_time=end - start)
399
400        return train_set_path, test_set_path

Save the train and test sets to the desired output directory.

Args: output_dir: the path of the directory to store both sets. train_set: the train set to save with at least three columns: 'user', 'item', 'rating'. test_set: the test set to save with at least three columns: 'user', 'item', 'rating'.

Returns: the paths where the train and test set are stored.