Skip to content

Instantly share code, notes, and snippets.

@jelmervdl
Last active February 26, 2024 11:22
Show Gist options
  • Save jelmervdl/62a7f357c40680b26c0e632f263d3864 to your computer and use it in GitHub Desktop.
Save jelmervdl/62a7f357c40680b26c0e632f263d3864 to your computer and use it in GitHub Desktop.
import multiprocessing as mp
from functools import wraps
from typing import Callable, Iterable, ParamSpec, TypeVar, Union
class EndOfIter:
"""Signals end of the offloaded iterator"""
exc: Optional[Exception]
def __init__(self, exc: Optional[Exception] = None):
self.exc = exc
P = ParamSpec("P")
R = TypeVar("R")
def offload_iter(
fn: Callable[P, Iterable[R]], *, maxsize: int = 0
) -> Callable[P, Iterable[R]]:
"""Offload a generator function to another process."""
def offload_iter_worker(
queue: "mp.Queue[Union[R,EndOfIter]]",
fn: Callable[P, Iterable[R]],
args: P.args,
kwargs: P.kwargs,
) -> None:
"""
Function executed in the other process that runs the generator
and puts its items on the queue.
"""
try:
for item in fn(*args, **kwargs):
queue.put(item)
queue.put(EndOfIter())
except Exception as exc:
queue.put(EndOfIter(exc))
@wraps(fn)
def wrapper(*args, **kwargs) -> Iterable[R]:
"""
Wrapper around the generator that communicates with the offloaded
worker. If an exception is thrown in the offloaded generator, it will
be copied and raised by this wrapper as well.
"""
queue: "mp.Queue[Union[R,EndOfIter]]" = mp.Queue(maxsize)
proc = mp.Process(
target=offload_iter_worker, args=(queue, fn, args, kwargs), daemon=True
)
proc.start()
val: Union[R, EndOfIter]
while True:
val = queue.get()
if not isinstance(val, EndOfIter):
yield val
else:
break
proc.join()
if val.exc:
raise val.exc
return wrapper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment