Skip to content

Instantly share code, notes, and snippets.

@pquentin
Created May 25, 2018 04:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pquentin/966864894ead9f4caa14b5bbfe732b54 to your computer and use it in GitHub Desktop.
Save pquentin/966864894ead9f4caa14b5bbfe732b54 to your computer and use it in GitHub Desktop.
Fanning out a generator with trio
import trio
async def producer():
for e in [1, 2, 3]:
yield e
async def consumer(iterator):
total = 0
async for e in iterator:
print('got {}'.format(e))
total += e
return total
async def consumer_wrapper(consumer, its, results, i):
results[i] = await consumer(its[i])
async def producer_wrapper(producer, its):
async for e in producer():
for it in its:
print('send {}'.format(e))
await it.send(e)
for it in its:
await it.send(None)
class Iterator:
def __init__(self):
self.queue = trio.Queue(1)
async def send(self, e):
await self.queue.put(e)
def __aiter__(self):
return self
async def __anext__(self):
next_value = await self.queue.get()
if next_value is None:
raise StopAsyncIteration
return next_value
async def main():
results = [None, None]
its = [Iterator(), Iterator()]
async with trio.open_nursery() as nursery:
nursery.start_soon(producer_wrapper, producer, its)
nursery.start_soon(consumer_wrapper, consumer, its, results, 0)
nursery.start_soon(consumer_wrapper, consumer, its, results, 1)
print(results)
if __name__ == '__main__':
trio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment