Source code for pept.utilities.parallel.parallel_map

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : parallel_map.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date   : 03.02.2020


import  numpy               as      np
from    multiprocessing     import  Pool


[docs]def parallel_map_file( func, # Called as func(data_chunk, chunk_number, *args, **kwargs) fname, # File that will be supplied to numpy.loadtxt start, # Start line end, # End line chunksize, # Number of lines per chunk *args, dtype = float, processes = None, callback = lambda x: None, error_callback = lambda x: None, **kwargs ): '''Utility for parallelising (read CSV chunk -> process chunk) workflows. This function reads individual chunks of data from a CSV-formatted file, then asynchronously sends them as numpy arrays to an arbitrary function `func` for processing. In effect, it reads a file in one main thread and processes it in separate threads. This is especially useful when dealing with very large files (like we do in PEPT...) that you'd like to process in batches, in parallel. Parameters ---------- func : callable The function that will be called with each chunk of data, the chunk number, the other positional arguments `*args` and keyword arguments `**kwargs`: `func(data_chunk, chunk_number, *args, **kwargs)`. `data_chunk` is a numpy array returned by `numpy.loadtxt` and `chunk_number` is an int. `func` must be picklable for sending to other threads. fname : file, str, or pathlib.Path The file, filename, or generator that numpy.loadtxt will be supplied with. start : int The starting line number that the chunks will be read from. end : int The ending line number that the chunks will be read from. This is exclusive. chunksize : int The number of lines that will be read for each chunk. *args : additional positional arguments Additional positional arguments that will be supplied to `func`. dtype : type The data type of the numpy array that is returned by numpy.loadtxt. The default is `float`. processes : int The maximum number of threads that will be used for calling `func`. If left to the default `None`, then the number returned by `os.cpu_count()` will be used. callback : callable When the result from a `func` call becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead. error_callback : callable If the target function `func` fails, then the error_callback is called with the exception instance. **kwargs : additional keybord arguments Additional keyword arguments that will be supplied to `func`. Returns ------- list A Python list of the `func` call returns. The results are not necessarily in order, though this can be verified by using the chunk number that is supplied to each call to `func`. If `func` does not return anything, it will simply be a list of `None`. Notes ----- This function uses `numpy.loadtxt` to read chunks of data and `multiprocessing.Pool.apply_async` to call `func` asynchronously. As the calls to `func` happen in different threads, all the usual parallel processing issues apply. For example, `func` should not save data to the same file, as it will overwrite results from different threads and may become corrupt - however, there is a workaround for this particular case: because the chunk numbers are guaranteed to be unique, any data can be saved to a file whose name includes this chunk number, making it unique. Examples -------- For a random file-like CSV data object: >>> import io >>> flike = io.StringIO("1,2,3\\n4,5,6\\n7,8,9") >>> def func(data, chunk_number): >>> return (data, chunk_number) >>> results = parallel_map_file(func, flike, 0, 3, 1) >>> print(results) >>> [ ([1, 2, 3], 0), ([4, 5, 6], 1), ([7, 8, 9], 2) ] ''' nchunks = int((end - start) / chunksize) with Pool(processes = processes) as pool: results = [] for i in range(nchunks): data = np.loadtxt( fname, skiprows = start + i * chunksize, max_rows = chunksize, dtype = dtype ) worker = pool.apply_async( func, (data, i, *args), kwargs, callback, error_callback ) results.append(worker) results = [r.get() for r in results] return results