pept.utilities.parallel_map_file#
- pept.utilities.parallel_map_file(func, fname, start, end, chunksize, *args, dtype=<class 'float'>, processes=None, callback=<function <lambda>>, error_callback=<function <lambda>>, **kwargs)[source]#
Utility for parallelising (read CSV chunk -> process chunk) workflows.
This function reads individual chunks of data from a CSV-formatted file, then asynchronously sends them as numpy arrays to an arbitrary function func for processing. In effect, it reads a file in one main thread and processes it in separate threads.
This is especially useful when dealing with very large files (like we do in PEPT…) that you’d like to process in batches, in parallel.
- Parameters
- func
callable()
The function that will be called with each chunk of data, the chunk number, the other positional arguments *args and keyword arguments **kwargs: func(data_chunk, chunk_number, *args, **kwargs). data_chunk is a numpy array returned by numpy.loadtxt and chunk_number is an int. func must be picklable for sending to other threads.
- fname
file
,str
,or
pathlib.Path
The file, filename, or generator that numpy.loadtxt will be supplied with.
- start
int
The starting line number that the chunks will be read from.
- end
int
The ending line number that the chunks will be read from. This is exclusive.
- chunksize
int
The number of lines that will be read for each chunk.
- *args
additional
positional
arguments
Additional positional arguments that will be supplied to func.
- dtype
type
The data type of the numpy array that is returned by numpy.loadtxt. The default is float.
- processes
int
The maximum number of threads that will be used for calling func. If left to the default None, then the number returned by os.cpu_count() will be used.
- callback
callable()
When the result from a func call becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.
- error_callback
callable()
If the target function func fails, then the error_callback is called with the exception instance.
- **kwargs
additional
keybord
arguments
Additional keyword arguments that will be supplied to func.
- func
- Returns
list
A Python list of the func call returns. The results are not necessarily in order, though this can be verified by using the chunk number that is supplied to each call to func. If func does not return anything, it will simply be a list of None.
Notes
This function uses numpy.loadtxt to read chunks of data and multiprocessing.Pool.apply_async to call func asynchronously.
As the calls to func happen in different threads, all the usual parallel processing issues apply. For example, func should not save data to the same file, as it will overwrite results from different threads and may become corrupt - however, there is a workaround for this particular case: because the chunk numbers are guaranteed to be unique, any data can be saved to a file whose name includes this chunk number, making it unique.
Examples
For a random file-like CSV data object:
>>> import io >>> flike = io.StringIO("1,2,3\n4,5,6\n7,8,9") >>> def func(data, chunk_number): >>> return (data, chunk_number) >>> results = parallel_map_file(func, flike, 0, 3, 1) >>> print(results) >>> [ ([1, 2, 3], 0), ([4, 5, 6], 1), ([7, 8, 9], 2) ]