Skip to content

Instantly share code, notes, and snippets.

@AvnerBen
Created October 16, 2021 18:23
Show Gist options
  • Save AvnerBen/68fcf5c1a7348f1297302e4b2be5d4fd to your computer and use it in GitHub Desktop.
Save AvnerBen/68fcf5c1a7348f1297302e4b2be5d4fd to your computer and use it in GitHub Desktop.
""" AB/PP/6: Asynchronous Producer/Consumer: 2. Message queue orchestrated by the consumer
"""
import asyncio
class Producer(object):
def __init__(self, outp):
self.isStop = False
self.outp = outp
async def run(self):
i = 1
while True:
if self.isStop:
await self.outp.put('STOP')
break
await self.outp.put('Round {}'.format(i))
await self.outp.join() # 1
i += 1
async def stop(self, delay: int):
await asyncio.sleep(delay)
self.isStop = True
class Consumer(object):
def __init__(self):
self.inp = asyncio.Queue()
async def run(self):
while True:
message = await self.inp.get()
if message == 'STOP':
self.inp.task_done() # 2
break
print(message)
await asyncio.sleep(2)
self.inp.task_done()
print('[End of input]')
async def main():
aConsumer = Consumer()
aProducer = Producer(aConsumer.inp)
await asyncio.gather(
aProducer.stop(20),
aProducer.run(),
aConsumer.run()
)
input()
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment