Skip to content

Instantly share code, notes, and snippets.

@showa-yojyo
Created May 12, 2020 10:02
Show Gist options
  • Save showa-yojyo/4ed200d4c41f496a45a7af2612912df3 to your computer and use it in GitHub Desktop.
Save showa-yojyo/4ed200d4c41f496a45a7af2612912df3 to your computer and use it in GitHub Desktop.
Implement Producer/Consumer pattern with asyncio.Queue
#!/usr/bin/env python
"""
A simple producer/consumer example, using Queue.task_done and Queue.join
From https://asyncio.readthedocs.io/en/latest/producer_consumer.html
"""
import asyncio
import random
async def produce(queue, n):
for x in range(1, n + 1):
# produce an item
print(f'producing {x}/{n}')
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
# put the item in the queue
await queue.put(x)
async def consume(queue):
while True:
# wait for an item from the producer
item = await queue.get()
# process the item
print(f'consuming {item}...')
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
# Notify the queue that the item has been processed
queue.task_done()
async def run(n):
queue = asyncio.Queue()
# schedule consumers
consumers = []
for _ in range(3):
consumer = asyncio.create_task(consume(queue))
consumers.append(consumer)
# run the producer and wait for completion
await produce(queue, n)
# wait until the consumer has processed all items
await queue.join()
# the consumers are still awaiting for an item, cancel them
for consumer in consumers:
consumer.cancel()
# wait until all worker tasks are cancelled
await asyncio.gather(*consumers, return_exceptions=True)
asyncio.run(run(10))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment