src.fairreckitlib.core.pipeline.core_pipeline
This module contains the base core pipeline class.
Classes:
CorePipeline: base pipeline class with shared IO functionality.
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. © Copyright Utrecht University (Department of Information and Computing Sciences)
1"""This module contains the base core pipeline class. 2 3Classes: 4 5 CorePipeline: base pipeline class with shared IO functionality. 6 7This program has been developed by students from the bachelor Computer Science at 8Utrecht University within the Software Project course. 9© Copyright Utrecht University (Department of Information and Computing Sciences) 10""" 11 12import time 13 14import pandas as pd 15 16from ..events.event_dispatcher import EventDispatcher 17from ..events.event_error import ON_RAISE_ERROR, ErrorEventArgs 18from ..io.event_io import ON_CREATE_FILE, DataframeEventArgs, FileEventArgs 19 20 21class CorePipeline: 22 """Base class for FairRecKit pipelines. 23 24 This class exposes some reusable functionality that can be used in derived 25 pipelines to read and/or write dataframes. 26 """ 27 28 def __init__(self, event_dispatcher: EventDispatcher): 29 """Construct the CorePipeline. 30 31 Args: 32 event_dispatcher: used to dispatch events when running the core pipeline. 33 """ 34 self.event_dispatcher = event_dispatcher 35 36 def read_dataframe( 37 self, 38 dataframe_path: str, 39 dataframe_name: str, 40 event_id_on_begin: str, 41 event_id_on_end: str, 42 *, 43 names=None) -> pd.DataFrame: 44 """Read a dataframe from the disk. 45 46 This function dispatches an error event when the FileNotFoundError is raised, 47 and thereafter the error is raised once more. 48 49 Args: 50 dataframe_path: path to the dataframe file. 51 dataframe_name: name of the dataframe to use for event dispatching. 52 event_id_on_begin: the event_id to dispatch when loading starts. 53 event_id_on_end: the event_id to dispatch when loading is finished. 54 names: the column names of the dataframe or None to infer them from the header. 55 56 Raises: 57 FileNotFoundError: when the dataframe file is not found. 58 59 Returns: 60 the loaded dataframe. 61 """ 62 self.event_dispatcher.dispatch(DataframeEventArgs( 63 event_id_on_begin, 64 dataframe_path, 65 dataframe_name 66 )) 67 68 start = time.time() 69 70 try: 71 dataframe = pd.read_csv( 72 dataframe_path, 73 sep='\t', 74 header='infer' if names is None else None, 75 names=names 76 ) 77 except FileNotFoundError as err: 78 self.event_dispatcher.dispatch(ErrorEventArgs( 79 ON_RAISE_ERROR, 80 'FileNotFoundError: raised while trying to load the ' + 81 dataframe_name + ' from ' + dataframe_path 82 )) 83 raise err 84 85 end = time.time() 86 87 self.event_dispatcher.dispatch(DataframeEventArgs( 88 event_id_on_end, 89 dataframe_path, 90 dataframe_name 91 ), elapsed_time=end - start) 92 93 return dataframe 94 95 def write_dataframe( 96 self, 97 dataframe_path: str, 98 dataframe: pd.DataFrame, 99 header: bool) -> None: 100 """Write a dataframe to the disk. 101 102 This function is intended to write (append) a dataframe in chunks, 103 including the header of the dataframe. 104 It is assumed that when the header is True that the dataframe file 105 has been created, which in turn will dispatch the IO event. 106 107 Args: 108 dataframe_path: path to the dataframe file. 109 dataframe: the dataframe to append to the file. 110 header: whether to include the header. 111 """ 112 dataframe.to_csv( 113 dataframe_path, 114 mode='a', 115 sep='\t', 116 header=header, 117 index=False 118 ) 119 # header is the first line meaning the file has just been created 120 if header: 121 self.event_dispatcher.dispatch(FileEventArgs(ON_CREATE_FILE, dataframe_path))
22class CorePipeline: 23 """Base class for FairRecKit pipelines. 24 25 This class exposes some reusable functionality that can be used in derived 26 pipelines to read and/or write dataframes. 27 """ 28 29 def __init__(self, event_dispatcher: EventDispatcher): 30 """Construct the CorePipeline. 31 32 Args: 33 event_dispatcher: used to dispatch events when running the core pipeline. 34 """ 35 self.event_dispatcher = event_dispatcher 36 37 def read_dataframe( 38 self, 39 dataframe_path: str, 40 dataframe_name: str, 41 event_id_on_begin: str, 42 event_id_on_end: str, 43 *, 44 names=None) -> pd.DataFrame: 45 """Read a dataframe from the disk. 46 47 This function dispatches an error event when the FileNotFoundError is raised, 48 and thereafter the error is raised once more. 49 50 Args: 51 dataframe_path: path to the dataframe file. 52 dataframe_name: name of the dataframe to use for event dispatching. 53 event_id_on_begin: the event_id to dispatch when loading starts. 54 event_id_on_end: the event_id to dispatch when loading is finished. 55 names: the column names of the dataframe or None to infer them from the header. 56 57 Raises: 58 FileNotFoundError: when the dataframe file is not found. 59 60 Returns: 61 the loaded dataframe. 62 """ 63 self.event_dispatcher.dispatch(DataframeEventArgs( 64 event_id_on_begin, 65 dataframe_path, 66 dataframe_name 67 )) 68 69 start = time.time() 70 71 try: 72 dataframe = pd.read_csv( 73 dataframe_path, 74 sep='\t', 75 header='infer' if names is None else None, 76 names=names 77 ) 78 except FileNotFoundError as err: 79 self.event_dispatcher.dispatch(ErrorEventArgs( 80 ON_RAISE_ERROR, 81 'FileNotFoundError: raised while trying to load the ' + 82 dataframe_name + ' from ' + dataframe_path 83 )) 84 raise err 85 86 end = time.time() 87 88 self.event_dispatcher.dispatch(DataframeEventArgs( 89 event_id_on_end, 90 dataframe_path, 91 dataframe_name 92 ), elapsed_time=end - start) 93 94 return dataframe 95 96 def write_dataframe( 97 self, 98 dataframe_path: str, 99 dataframe: pd.DataFrame, 100 header: bool) -> None: 101 """Write a dataframe to the disk. 102 103 This function is intended to write (append) a dataframe in chunks, 104 including the header of the dataframe. 105 It is assumed that when the header is True that the dataframe file 106 has been created, which in turn will dispatch the IO event. 107 108 Args: 109 dataframe_path: path to the dataframe file. 110 dataframe: the dataframe to append to the file. 111 header: whether to include the header. 112 """ 113 dataframe.to_csv( 114 dataframe_path, 115 mode='a', 116 sep='\t', 117 header=header, 118 index=False 119 ) 120 # header is the first line meaning the file has just been created 121 if header: 122 self.event_dispatcher.dispatch(FileEventArgs(ON_CREATE_FILE, dataframe_path))
Base class for FairRecKit pipelines.
This class exposes some reusable functionality that can be used in derived pipelines to read and/or write dataframes.
29 def __init__(self, event_dispatcher: EventDispatcher): 30 """Construct the CorePipeline. 31 32 Args: 33 event_dispatcher: used to dispatch events when running the core pipeline. 34 """ 35 self.event_dispatcher = event_dispatcher
Construct the CorePipeline.
Args: event_dispatcher: used to dispatch events when running the core pipeline.
37 def read_dataframe( 38 self, 39 dataframe_path: str, 40 dataframe_name: str, 41 event_id_on_begin: str, 42 event_id_on_end: str, 43 *, 44 names=None) -> pd.DataFrame: 45 """Read a dataframe from the disk. 46 47 This function dispatches an error event when the FileNotFoundError is raised, 48 and thereafter the error is raised once more. 49 50 Args: 51 dataframe_path: path to the dataframe file. 52 dataframe_name: name of the dataframe to use for event dispatching. 53 event_id_on_begin: the event_id to dispatch when loading starts. 54 event_id_on_end: the event_id to dispatch when loading is finished. 55 names: the column names of the dataframe or None to infer them from the header. 56 57 Raises: 58 FileNotFoundError: when the dataframe file is not found. 59 60 Returns: 61 the loaded dataframe. 62 """ 63 self.event_dispatcher.dispatch(DataframeEventArgs( 64 event_id_on_begin, 65 dataframe_path, 66 dataframe_name 67 )) 68 69 start = time.time() 70 71 try: 72 dataframe = pd.read_csv( 73 dataframe_path, 74 sep='\t', 75 header='infer' if names is None else None, 76 names=names 77 ) 78 except FileNotFoundError as err: 79 self.event_dispatcher.dispatch(ErrorEventArgs( 80 ON_RAISE_ERROR, 81 'FileNotFoundError: raised while trying to load the ' + 82 dataframe_name + ' from ' + dataframe_path 83 )) 84 raise err 85 86 end = time.time() 87 88 self.event_dispatcher.dispatch(DataframeEventArgs( 89 event_id_on_end, 90 dataframe_path, 91 dataframe_name 92 ), elapsed_time=end - start) 93 94 return dataframe
Read a dataframe from the disk.
This function dispatches an error event when the FileNotFoundError is raised, and thereafter the error is raised once more.
Args: dataframe_path: path to the dataframe file. dataframe_name: name of the dataframe to use for event dispatching. event_id_on_begin: the event_id to dispatch when loading starts. event_id_on_end: the event_id to dispatch when loading is finished. names: the column names of the dataframe or None to infer them from the header.
Raises: FileNotFoundError: when the dataframe file is not found.
Returns: the loaded dataframe.
96 def write_dataframe( 97 self, 98 dataframe_path: str, 99 dataframe: pd.DataFrame, 100 header: bool) -> None: 101 """Write a dataframe to the disk. 102 103 This function is intended to write (append) a dataframe in chunks, 104 including the header of the dataframe. 105 It is assumed that when the header is True that the dataframe file 106 has been created, which in turn will dispatch the IO event. 107 108 Args: 109 dataframe_path: path to the dataframe file. 110 dataframe: the dataframe to append to the file. 111 header: whether to include the header. 112 """ 113 dataframe.to_csv( 114 dataframe_path, 115 mode='a', 116 sep='\t', 117 header=header, 118 index=False 119 ) 120 # header is the first line meaning the file has just been created 121 if header: 122 self.event_dispatcher.dispatch(FileEventArgs(ON_CREATE_FILE, dataframe_path))
Write a dataframe to the disk.
This function is intended to write (append) a dataframe in chunks, including the header of the dataframe. It is assumed that when the header is True that the dataframe file has been created, which in turn will dispatch the IO event.
Args: dataframe_path: path to the dataframe file. dataframe: the dataframe to append to the file. header: whether to include the header.