Last active
May 1, 2024 20:53
-
-
Save JamesTheAwesomeDude/27000568553db5f27bfd29e92bb1072d to your computer and use it in GitHub Desktop.
tee into multiple "for" loops in parallel without async, threading, or subprocess
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import greenlet | |
def teeinto_lockstep(it, *consumer_callables): | |
"""tee iterator *it* into multiple non-`async`, non-Generator consumers, in constant memory, without `threading`. | |
The behavior of each passed iterable is UNDEFINED after its respective consumer exits. | |
If a consumer function raises an exception, that exception will bubble out of teeinto_lockstep. | |
If a consumer function raises an exception, the other consumers will (currently) never be resumed. | |
No exception will be raised; their next() calls will simply diverge. This is considered a BUG, and | |
will be changed to ...something more reasonable?... in a future edition of this function. | |
""" | |
# h/t https://morestina.net/blog/1378/parallel-iteration-in-python for inspiration | |
# 1. Create greenlets | |
PARENT_GLET = greenlet.getcurrent() | |
IT_EXHAUST = object() | |
class _State(Enum): | |
NEXT_CALLED = object() | |
RETURNED = object() | |
def wrap(f): | |
_next = partial(PARENT_GLET.switch, _State.NEXT_CALLED) | |
def wrapper(): | |
_it = iter(_next, IT_EXHAUST) | |
result = f(_it) | |
return _State.RETURNED, result | |
return wrapper | |
consumer_glets = [greenlet.greenlet(wrap(f)) for f in consumer_callables] | |
# 2. Start consumers | |
it = iter(it) | |
results = [g.switch() for g in consumer_glets] | |
for result in results: | |
match result: | |
case _State.NEXT_CALLED: | |
continue | |
case (_State.RETURNED, returned_result): | |
if isinstance(returned_result, (Iterator, Generator, AsyncGenerator)): | |
import warnings | |
warnings.warn("teeinto_lockstep consumer did not call next() even once, and returned an Iterator, Generator, or AsyncGenerator. This is almost surely an error. Did you mean to encapsulate your generator function with a statement like `(lambda g: lambda it: deque(g(it), 1).pop())(GENERATOR_FUNCTION)`?", RuntimeWarning) | |
case _: | |
raise RuntimeError("unexpected switch into teeinto_lockstep") | |
# 3. Iterate | |
while True: | |
if _State.NEXT_CALLED in results: | |
val = next(it, IT_EXHAUST) | |
else: | |
# This is an "early-exit" condition, if ALL consumers have stopped | |
# pulling then we won't pull anything further from the iterable | |
# (which might not be finite, among other correctness reasons). | |
break | |
for i, (g, state) in enumerate(zip(consumer_glets, results, strict=True)): | |
match state: | |
case _State.NEXT_CALLED: | |
# Consumer is still alive and pulling | |
results[i] = g.switch(val) | |
case _State.RETURNED, _: | |
# One consumer happened to exit without exhausting | |
# its iterable; this is allowed | |
assert g.dead | |
# There is no "case ERRORED" because greenlet itself bubbles errors | |
# and inheriting that leads to completely reasonable handling | |
case _: | |
raise RuntimeError("unexpected switch into teeinto_lockstep") | |
if val is IT_EXHAUST: | |
# This is the USUAL exit condition when the input iterable is finite. | |
# (Note this check is ONLY AFTER feeding exhaustion signal to all consumers, | |
# to ensure they all have the chance to run any post-loop logic.) | |
break | |
# 3. Cleanup & return | |
def collect(results): | |
for g, state in zip(consumer_glets, results, strict=True): | |
match state: | |
case (_State.RETURNED, result) if g.dead: | |
yield result | |
case _: | |
raise RuntimeError("teeinto_lockstep found consumer in inconsistent state") | |
return tuple(_collect(results)) | |
from collections.abc import AsyncGenerator, Iterator, Generator | |
from enum import Enum | |
from functools import partial |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Y = TypeVar("Y") | |
R = Annotated(TypeVar("R"), "https://stackoverflow.com/q/76516439/1874170") | |
def teeinto_lockstep( | |
it: Iterator[Y], | |
*consumer_callables: Callable[[Iterator[Y]], R] | |
) -> Tuple[R, ...]: ... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment