Skip to content

Instantly share code, notes, and snippets.

@AvnerBen
Last active October 16, 2021 19:39
Show Gist options
  • Save AvnerBen/9b25ac62f07e2328dd0bd305ca3a2730 to your computer and use it in GitHub Desktop.
Save AvnerBen/9b25ac62f07e2328dd0bd305ca3a2730 to your computer and use it in GitHub Desktop.
""" AB/PP/6: Asynchronous Producer/Consumer: 3. Consumer iterates on multiple Producers
"""
import asyncio
from typing import Iterable
class Producer(object):
def __init__(self, prefix: str):
self.prefix = prefix
self.isStop = False
def __iter__(self):
i = 1
while not self.isStop:
yield f'Round {self.prefix}{i}'
i += 1
def stop(self):
self.isStop = True
class MultiProducerAdapter(object):
def __init__(self, producers: list[Producer]):
self.producers = producers
self.producerIters = [iter(x) for x in producers]
async def __aiter__(self): # 1
try:
while True:
for itr in self.producerIters: # 2
yield next(itr)
await asyncio.sleep(2)
except StopIteration:
return
async def stop(self, delay: int):
await asyncio.sleep(delay)
for producer in self.producers:
producer.stop()
class Consumer(object):
def __init__(self, inp: Iterable[str]):
self.inp = inp
async def run(self): # 3
async for message in self.inp: # 4
print(message)
print('[End of input]')
async def main():
producers = [Producer(c) for c in 'abc']
mux = MultiProducerAdapter(producers)
aConsumer = Consumer(mux)
await asyncio.gather(
mux.stop(10),
aConsumer.run()
)
input()
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment