Skip to content

Instantly share code, notes, and snippets.

@JamesTheAwesomeDude
Last active May 1, 2024 20:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JamesTheAwesomeDude/27000568553db5f27bfd29e92bb1072d to your computer and use it in GitHub Desktop.
Save JamesTheAwesomeDude/27000568553db5f27bfd29e92bb1072d to your computer and use it in GitHub Desktop.
tee into multiple "for" loops in parallel without async, threading, or subprocess
import greenlet
def teeinto_lockstep(it, *consumer_callables):
"""tee iterator *it* into multiple non-`async`, non-Generator consumers, in constant memory, without `threading`.
The behavior of each passed iterable is UNDEFINED after its respective consumer exits.
If a consumer function raises an exception, that exception will bubble out of teeinto_lockstep.
If a consumer function raises an exception, the other consumers will (currently) never be resumed.
No exception will be raised; their next() calls will simply diverge. This is considered a BUG, and
will be changed to ...something more reasonable?... in a future edition of this function.
"""
# h/t https://morestina.net/blog/1378/parallel-iteration-in-python for inspiration
# 1. Create greenlets
PARENT_GLET = greenlet.getcurrent()
IT_EXHAUST = object()
class _State(Enum):
NEXT_CALLED = object()
RETURNED = object()
def wrap(f):
_next = partial(PARENT_GLET.switch, _State.NEXT_CALLED)
def wrapper():
_it = iter(_next, IT_EXHAUST)
result = f(_it)
return _State.RETURNED, result
return wrapper
consumer_glets = [greenlet.greenlet(wrap(f)) for f in consumer_callables]
# 2. Start consumers
it = iter(it)
results = [g.switch() for g in consumer_glets]
for result in results:
match result:
case _State.NEXT_CALLED:
continue
case (_State.RETURNED, returned_result):
if isinstance(returned_result, (Iterator, Generator, AsyncGenerator)):
import warnings
warnings.warn("teeinto_lockstep consumer did not call next() even once, and returned an Iterator, Generator, or AsyncGenerator. This is almost surely an error. Did you mean to encapsulate your generator function with a statement like `(lambda g: lambda it: deque(g(it), 1).pop())(GENERATOR_FUNCTION)`?", RuntimeWarning)
case _:
raise RuntimeError("unexpected switch into teeinto_lockstep")
# 3. Iterate
while True:
if _State.NEXT_CALLED in results:
val = next(it, IT_EXHAUST)
else:
# This is an "early-exit" condition, if ALL consumers have stopped
# pulling then we won't pull anything further from the iterable
# (which might not be finite, among other correctness reasons).
break
for i, (g, state) in enumerate(zip(consumer_glets, results, strict=True)):
match state:
case _State.NEXT_CALLED:
# Consumer is still alive and pulling
results[i] = g.switch(val)
case _State.RETURNED, _:
# One consumer happened to exit without exhausting
# its iterable; this is allowed
assert g.dead
# There is no "case ERRORED" because greenlet itself bubbles errors
# and inheriting that leads to completely reasonable handling
case _:
raise RuntimeError("unexpected switch into teeinto_lockstep")
if val is IT_EXHAUST:
# This is the USUAL exit condition when the input iterable is finite.
# (Note this check is ONLY AFTER feeding exhaustion signal to all consumers,
# to ensure they all have the chance to run any post-loop logic.)
break
# 3. Cleanup & return
def collect(results):
for g, state in zip(consumer_glets, results, strict=True):
match state:
case (_State.RETURNED, result) if g.dead:
yield result
case _:
raise RuntimeError("teeinto_lockstep found consumer in inconsistent state")
return tuple(_collect(results))
from collections.abc import AsyncGenerator, Iterator, Generator
from enum import Enum
from functools import partial
Y = TypeVar("Y")
R = Annotated(TypeVar("R"), "https://stackoverflow.com/q/76516439/1874170")
def teeinto_lockstep(
it: Iterator[Y],
*consumer_callables: Callable[[Iterator[Y]], R]
) -> Tuple[R, ...]: ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment