src.fairreckitlib.core.pipeline.core_pipeline

This module contains the base core pipeline class.

Classes:

CorePipeline: base pipeline class with shared IO functionality.

This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)

  1"""This module contains the base core pipeline class.
  2
  3Classes:
  4
  5    CorePipeline: base pipeline class with shared IO functionality.
  6
  7This program has been developed by students from the bachelor Computer Science at
  8Utrecht University within the Software Project course.
  9© Copyright Utrecht University (Department of Information and Computing Sciences)
 10"""
 11
 12import time
 13
 14import pandas as pd
 15
 16from ..events.event_dispatcher import EventDispatcher
 17from ..events.event_error import ON_RAISE_ERROR, ErrorEventArgs
 18from ..io.event_io import ON_CREATE_FILE, DataframeEventArgs, FileEventArgs
 19
 20
 21class CorePipeline:
 22    """Base class for FairRecKit pipelines.
 23
 24    This class exposes some reusable functionality that can be used in derived
 25    pipelines to read and/or write dataframes.
 26    """
 27
 28    def __init__(self, event_dispatcher: EventDispatcher):
 29        """Construct the CorePipeline.
 30
 31        Args:
 32            event_dispatcher: used to dispatch events when running the core pipeline.
 33        """
 34        self.event_dispatcher = event_dispatcher
 35
 36    def read_dataframe(
 37            self,
 38            dataframe_path: str,
 39            dataframe_name: str,
 40            event_id_on_begin: str,
 41            event_id_on_end: str,
 42            *,
 43            names=None) -> pd.DataFrame:
 44        """Read a dataframe from the disk.
 45
 46        This function dispatches an error event when the FileNotFoundError is raised,
 47        and thereafter the error is raised once more.
 48
 49        Args:
 50            dataframe_path: path to the dataframe file.
 51            dataframe_name: name of the dataframe to use for event dispatching.
 52            event_id_on_begin: the event_id to dispatch when loading starts.
 53            event_id_on_end: the event_id to dispatch when loading is finished.
 54            names: the column names of the dataframe or None to infer them from the header.
 55
 56        Raises:
 57            FileNotFoundError: when the dataframe file is not found.
 58
 59        Returns:
 60            the loaded dataframe.
 61        """
 62        self.event_dispatcher.dispatch(DataframeEventArgs(
 63            event_id_on_begin,
 64            dataframe_path,
 65            dataframe_name
 66        ))
 67
 68        start = time.time()
 69
 70        try:
 71            dataframe = pd.read_csv(
 72                dataframe_path,
 73                sep='\t',
 74                header='infer' if names is None else None,
 75                names=names
 76            )
 77        except FileNotFoundError as err:
 78            self.event_dispatcher.dispatch(ErrorEventArgs(
 79                ON_RAISE_ERROR,
 80                'FileNotFoundError: raised while trying to load the ' +
 81                dataframe_name + ' from ' + dataframe_path
 82            ))
 83            raise err
 84
 85        end = time.time()
 86
 87        self.event_dispatcher.dispatch(DataframeEventArgs(
 88            event_id_on_end,
 89            dataframe_path,
 90            dataframe_name
 91        ), elapsed_time=end - start)
 92
 93        return dataframe
 94
 95    def write_dataframe(
 96            self,
 97            dataframe_path: str,
 98            dataframe: pd.DataFrame,
 99            header: bool) -> None:
100        """Write a dataframe to the disk.
101
102        This function is intended to write (append) a dataframe in chunks,
103        including the header of the dataframe.
104        It is assumed that when the header is True that the dataframe file
105        has been created, which in turn will dispatch the IO event.
106
107        Args:
108            dataframe_path: path to the dataframe file.
109            dataframe: the dataframe to append to the file.
110            header: whether to include the header.
111        """
112        dataframe.to_csv(
113            dataframe_path,
114            mode='a',
115            sep='\t',
116            header=header,
117            index=False
118        )
119        # header is the first line meaning the file has just been created
120        if header:
121            self.event_dispatcher.dispatch(FileEventArgs(ON_CREATE_FILE, dataframe_path))
class CorePipeline:
 22class CorePipeline:
 23    """Base class for FairRecKit pipelines.
 24
 25    This class exposes some reusable functionality that can be used in derived
 26    pipelines to read and/or write dataframes.
 27    """
 28
 29    def __init__(self, event_dispatcher: EventDispatcher):
 30        """Construct the CorePipeline.
 31
 32        Args:
 33            event_dispatcher: used to dispatch events when running the core pipeline.
 34        """
 35        self.event_dispatcher = event_dispatcher
 36
 37    def read_dataframe(
 38            self,
 39            dataframe_path: str,
 40            dataframe_name: str,
 41            event_id_on_begin: str,
 42            event_id_on_end: str,
 43            *,
 44            names=None) -> pd.DataFrame:
 45        """Read a dataframe from the disk.
 46
 47        This function dispatches an error event when the FileNotFoundError is raised,
 48        and thereafter the error is raised once more.
 49
 50        Args:
 51            dataframe_path: path to the dataframe file.
 52            dataframe_name: name of the dataframe to use for event dispatching.
 53            event_id_on_begin: the event_id to dispatch when loading starts.
 54            event_id_on_end: the event_id to dispatch when loading is finished.
 55            names: the column names of the dataframe or None to infer them from the header.
 56
 57        Raises:
 58            FileNotFoundError: when the dataframe file is not found.
 59
 60        Returns:
 61            the loaded dataframe.
 62        """
 63        self.event_dispatcher.dispatch(DataframeEventArgs(
 64            event_id_on_begin,
 65            dataframe_path,
 66            dataframe_name
 67        ))
 68
 69        start = time.time()
 70
 71        try:
 72            dataframe = pd.read_csv(
 73                dataframe_path,
 74                sep='\t',
 75                header='infer' if names is None else None,
 76                names=names
 77            )
 78        except FileNotFoundError as err:
 79            self.event_dispatcher.dispatch(ErrorEventArgs(
 80                ON_RAISE_ERROR,
 81                'FileNotFoundError: raised while trying to load the ' +
 82                dataframe_name + ' from ' + dataframe_path
 83            ))
 84            raise err
 85
 86        end = time.time()
 87
 88        self.event_dispatcher.dispatch(DataframeEventArgs(
 89            event_id_on_end,
 90            dataframe_path,
 91            dataframe_name
 92        ), elapsed_time=end - start)
 93
 94        return dataframe
 95
 96    def write_dataframe(
 97            self,
 98            dataframe_path: str,
 99            dataframe: pd.DataFrame,
100            header: bool) -> None:
101        """Write a dataframe to the disk.
102
103        This function is intended to write (append) a dataframe in chunks,
104        including the header of the dataframe.
105        It is assumed that when the header is True that the dataframe file
106        has been created, which in turn will dispatch the IO event.
107
108        Args:
109            dataframe_path: path to the dataframe file.
110            dataframe: the dataframe to append to the file.
111            header: whether to include the header.
112        """
113        dataframe.to_csv(
114            dataframe_path,
115            mode='a',
116            sep='\t',
117            header=header,
118            index=False
119        )
120        # header is the first line meaning the file has just been created
121        if header:
122            self.event_dispatcher.dispatch(FileEventArgs(ON_CREATE_FILE, dataframe_path))

Base class for FairRecKit pipelines.

This class exposes some reusable functionality that can be used in derived pipelines to read and/or write dataframes.

CorePipeline( event_dispatcher: src.fairreckitlib.core.events.event_dispatcher.EventDispatcher)
29    def __init__(self, event_dispatcher: EventDispatcher):
30        """Construct the CorePipeline.
31
32        Args:
33            event_dispatcher: used to dispatch events when running the core pipeline.
34        """
35        self.event_dispatcher = event_dispatcher

Construct the CorePipeline.

Args: event_dispatcher: used to dispatch events when running the core pipeline.

def read_dataframe( self, dataframe_path: str, dataframe_name: str, event_id_on_begin: str, event_id_on_end: str, *, names=None) -> pandas.core.frame.DataFrame:
37    def read_dataframe(
38            self,
39            dataframe_path: str,
40            dataframe_name: str,
41            event_id_on_begin: str,
42            event_id_on_end: str,
43            *,
44            names=None) -> pd.DataFrame:
45        """Read a dataframe from the disk.
46
47        This function dispatches an error event when the FileNotFoundError is raised,
48        and thereafter the error is raised once more.
49
50        Args:
51            dataframe_path: path to the dataframe file.
52            dataframe_name: name of the dataframe to use for event dispatching.
53            event_id_on_begin: the event_id to dispatch when loading starts.
54            event_id_on_end: the event_id to dispatch when loading is finished.
55            names: the column names of the dataframe or None to infer them from the header.
56
57        Raises:
58            FileNotFoundError: when the dataframe file is not found.
59
60        Returns:
61            the loaded dataframe.
62        """
63        self.event_dispatcher.dispatch(DataframeEventArgs(
64            event_id_on_begin,
65            dataframe_path,
66            dataframe_name
67        ))
68
69        start = time.time()
70
71        try:
72            dataframe = pd.read_csv(
73                dataframe_path,
74                sep='\t',
75                header='infer' if names is None else None,
76                names=names
77            )
78        except FileNotFoundError as err:
79            self.event_dispatcher.dispatch(ErrorEventArgs(
80                ON_RAISE_ERROR,
81                'FileNotFoundError: raised while trying to load the ' +
82                dataframe_name + ' from ' + dataframe_path
83            ))
84            raise err
85
86        end = time.time()
87
88        self.event_dispatcher.dispatch(DataframeEventArgs(
89            event_id_on_end,
90            dataframe_path,
91            dataframe_name
92        ), elapsed_time=end - start)
93
94        return dataframe

Read a dataframe from the disk.

This function dispatches an error event when the FileNotFoundError is raised, and thereafter the error is raised once more.

Args: dataframe_path: path to the dataframe file. dataframe_name: name of the dataframe to use for event dispatching. event_id_on_begin: the event_id to dispatch when loading starts. event_id_on_end: the event_id to dispatch when loading is finished. names: the column names of the dataframe or None to infer them from the header.

Raises: FileNotFoundError: when the dataframe file is not found.

Returns: the loaded dataframe.

def write_dataframe( self, dataframe_path: str, dataframe: pandas.core.frame.DataFrame, header: bool) -> None:
 96    def write_dataframe(
 97            self,
 98            dataframe_path: str,
 99            dataframe: pd.DataFrame,
100            header: bool) -> None:
101        """Write a dataframe to the disk.
102
103        This function is intended to write (append) a dataframe in chunks,
104        including the header of the dataframe.
105        It is assumed that when the header is True that the dataframe file
106        has been created, which in turn will dispatch the IO event.
107
108        Args:
109            dataframe_path: path to the dataframe file.
110            dataframe: the dataframe to append to the file.
111            header: whether to include the header.
112        """
113        dataframe.to_csv(
114            dataframe_path,
115            mode='a',
116            sep='\t',
117            header=header,
118            index=False
119        )
120        # header is the first line meaning the file has just been created
121        if header:
122            self.event_dispatcher.dispatch(FileEventArgs(ON_CREATE_FILE, dataframe_path))

Write a dataframe to the disk.

This function is intended to write (append) a dataframe in chunks, including the header of the dataframe. It is assumed that when the header is True that the dataframe file has been created, which in turn will dispatch the IO event.

Args: dataframe_path: path to the dataframe file. dataframe: the dataframe to append to the file. header: whether to include the header.