pept.utilities.parallel_map_file#

pept.utilities.parallel_map_file(func, fname, start, end, chunksize, *args, dtype=<class 'float'>, processes=None, callback=<function <lambda>>, error_callback=<function <lambda>>, **kwargs)[source]#

Utility for parallelising (read CSV chunk -> process chunk) workflows.

This function reads individual chunks of data from a CSV-formatted file, then asynchronously sends them as numpy arrays to an arbitrary function func for processing. In effect, it reads a file in one main thread and processes it in separate threads.

This is especially useful when dealing with very large files (like we do in PEPT…) that you’d like to process in batches, in parallel.

Parameters
funccallable()

The function that will be called with each chunk of data, the chunk number, the other positional arguments *args and keyword arguments **kwargs: func(data_chunk, chunk_number, *args, **kwargs). data_chunk is a numpy array returned by numpy.loadtxt and chunk_number is an int. func must be picklable for sending to other threads.

fnamefile, str, or pathlib.Path

The file, filename, or generator that numpy.loadtxt will be supplied with.

startint

The starting line number that the chunks will be read from.

endint

The ending line number that the chunks will be read from. This is exclusive.

chunksizeint

The number of lines that will be read for each chunk.

*argsadditional positional arguments

Additional positional arguments that will be supplied to func.

dtypetype

The data type of the numpy array that is returned by numpy.loadtxt. The default is float.

processesint

The maximum number of threads that will be used for calling func. If left to the default None, then the number returned by os.cpu_count() will be used.

callbackcallable()

When the result from a func call becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.

error_callbackcallable()

If the target function func fails, then the error_callback is called with the exception instance.

**kwargsadditional keybord arguments

Additional keyword arguments that will be supplied to func.

Returns
list

A Python list of the func call returns. The results are not necessarily in order, though this can be verified by using the chunk number that is supplied to each call to func. If func does not return anything, it will simply be a list of None.

Notes

This function uses numpy.loadtxt to read chunks of data and multiprocessing.Pool.apply_async to call func asynchronously.

As the calls to func happen in different threads, all the usual parallel processing issues apply. For example, func should not save data to the same file, as it will overwrite results from different threads and may become corrupt - however, there is a workaround for this particular case: because the chunk numbers are guaranteed to be unique, any data can be saved to a file whose name includes this chunk number, making it unique.

Examples

For a random file-like CSV data object:

>>> import io
>>> flike = io.StringIO("1,2,3\n4,5,6\n7,8,9")
>>> def func(data, chunk_number):
>>>     return (data, chunk_number)
>>> results = parallel_map_file(func, flike, 0, 3, 1)
>>> print(results)
>>> [ ([1, 2, 3], 0), ([4, 5, 6], 1), ([7, 8, 9], 2) ]