Skip to content

Instantly share code, notes, and snippets.

@gtors
Created March 8, 2021 17:27
Show Gist options
  • Save gtors/f7e5916d64829f858f8e3c3a7f5e836e to your computer and use it in GitHub Desktop.
Save gtors/f7e5916d64829f858f8e3c3a7f5e836e to your computer and use it in GitHub Desktop.
draft of coroutine based pipeline
import itertools
NEXT = object()
class new_pipeline:
def __init__(self):
self._coroutines = []
self._handler = None
def pipe_to(self, coro):
assert next(coro) == NEXT
self._coroutines.append(coro)
return self
def handler(self, func):
self._handler = func
return self
def _run_flow(self, seq):
coros = self._coroutines
coro_index = 0
coro_max_index = len(self._coroutines) - 1
stack = [iter(seq)]
while stack:
# If no coroutines available, then consume last values
if coro_index > coro_max_index:
yield from stack[-1]
coro_index -= 1
del stack[-1]
# Choose current coroutine
coro = coros[coro_index]
# Coroutine can behave like filter, so some values can be skipped
for val in stack[-1]:
coro_answer = coro.send(val)
# NEXT marker indicates that coroutine waits for next value
if coro_answer != NEXT:
break
# If there are no values left and coroutine did not choose value,
# then return controlflow to previous coroutine
else:
coro_index -= 1
# If it was topmost cortoutine, then the while-loop will be exit
del stack[-1]
continue
# Concatenate all coroutine generated values into a single iterator.
coro_values = itertools.chain(
(coro_answer,),
itertools.takewhile(NEXT.__ne__, coro)
)
stack.append(coro_values)
coro_index += 1
def consume(self, seq):
if self._handler:
for result in self._run_flow(seq):
self._handler(result)
else:
return tuple(self._run_flow(seq))
def odd_only():
while True:
val = (yield NEXT)
if val & 1 == 1:
yield val
def power_of_2():
while True:
val = (yield NEXT)
yield val ** 2
pipeline = (
new_pipeline()
.pipe_to(odd_only())
.pipe_to(power_of_2())
.handler(print)
)
pipeline.consume(range(100))
1
9
25
49
81
121
169
225
289
361
441
529
625
729
841
961
1089
1225
1369
1521
1681
1849
2025
2209
2401
2601
2809
3025
3249
3481
3721
3969
4225
4489
4761
5041
5329
5625
5929
6241
6561
6889
7225
7569
7921
8281
8649
9025
9409
9801
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment