Last active
October 16, 2021 19:39
-
-
Save AvnerBen/9b25ac62f07e2328dd0bd305ca3a2730 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" AB/PP/6: Asynchronous Producer/Consumer: 3. Consumer iterates on multiple Producers | |
""" | |
import asyncio | |
from typing import Iterable | |
class Producer(object): | |
def __init__(self, prefix: str): | |
self.prefix = prefix | |
self.isStop = False | |
def __iter__(self): | |
i = 1 | |
while not self.isStop: | |
yield f'Round {self.prefix}{i}' | |
i += 1 | |
def stop(self): | |
self.isStop = True | |
class MultiProducerAdapter(object): | |
def __init__(self, producers: list[Producer]): | |
self.producers = producers | |
self.producerIters = [iter(x) for x in producers] | |
async def __aiter__(self): # 1 | |
try: | |
while True: | |
for itr in self.producerIters: # 2 | |
yield next(itr) | |
await asyncio.sleep(2) | |
except StopIteration: | |
return | |
async def stop(self, delay: int): | |
await asyncio.sleep(delay) | |
for producer in self.producers: | |
producer.stop() | |
class Consumer(object): | |
def __init__(self, inp: Iterable[str]): | |
self.inp = inp | |
async def run(self): # 3 | |
async for message in self.inp: # 4 | |
print(message) | |
print('[End of input]') | |
async def main(): | |
producers = [Producer(c) for c in 'abc'] | |
mux = MultiProducerAdapter(producers) | |
aConsumer = Consumer(mux) | |
await asyncio.gather( | |
mux.stop(10), | |
aConsumer.run() | |
) | |
input() | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment