Source code for monkey_wrench.process._process

"""The module providing functionalities for multiprocessing."""

from multiprocessing import get_context
from typing import Callable, TypeVar

from pydantic import NonNegativeInt

from monkey_wrench.generic import ListSetTuple, Model

T = TypeVar("T")
R = TypeVar("R")


[docs] class MultiProcess(Model): """Pydantic model for multiprocessing. Example: .. code-block:: python def power(x): # Note that the function only accepts a single argument! print(x[0] ** x[1]) # We use indices to extract our desired arguments from the single input argument. MultiProcess(number_of_processes=2).run(power, [(1, 3), (2, 5)]) """ number_of_processes: NonNegativeInt = 1 """Number of process to use. Defaults to ``1``. A value of ``1`` disables multiprocessing. This is useful for e.g. testing purposes. """
[docs] def run_with_results(self, function: Callable[[T], R], arguments: ListSetTuple[T]) -> list[R]: """Call the provided function with different arguments using multiple processes. Args: function: The function to be called. It must only accept a single argument and must not be a ``lambda``. In case you need a function with several input arguments, your function can accept single a tuple which packs all the arguments. You can then unpack, index, or iterate over the input, inside the function body. See the example. arguments: An iterable (list/set/tuple) of arguments that will be passed to the function. Returns: A list of returned results from the function in the same order as the given arguments (if not a set). """ ctx = get_context("spawn") if self.number_of_processes == 1: return [function(arg) for arg in arguments] with ctx.Pool(processes=self.number_of_processes) as pool: results = pool.map(function, arguments) return results
[docs] def run(self, function: Callable[[T], R], arguments: ListSetTuple[T]) -> None: """Similar to :func:`run_with_results`, but does not return anything. If the function returns anything, it will be discarded! """ ctx = get_context("spawn") arguments = list(arguments) for index in range(0, len(arguments), self.number_of_processes): procs = [] for arg in arguments[index: index + self.number_of_processes]: proc = ctx.Process(target=function, args=(arg,)) procs.append(proc) proc.start() for proc in procs: proc.join() for proc in procs: proc.close()