Skip to content

Instantly share code, notes, and snippets.

@leplatrem
Last active October 12, 2023 11:33
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leplatrem/2165f81da1b54001c4a6d47ea390b6f9 to your computer and use it in GitHub Desktop.
Save leplatrem/2165f81da1b54001c4a6d47ea390b6f9 to your computer and use it in GitHub Desktop.
Batched producer/consumer
import asyncio
import async_timeout
import concurrent.futures
import random
import time
async def produce(queue, n):
for x in range(n):
# produce an item
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
item = str(x)
# put the item in the queue
await queue.put(item)
async def consume(loop, queue, executor):
def markdone(queue, n):
return lambda fut: [queue.task_done() for _ in range(n)]
while True:
# wait for an item from the producer
items = []
try:
with async_timeout.timeout(2):
while len(items) < 5:
item = await queue.get()
print('consuming {}...'.format(item))
items.append(item)
except asyncio.TimeoutError:
print("Producer done or not fast enough, proceed.")
pass
if items:
print('→ batch {}...'.format(len(items)))
task = loop.run_in_executor(executor, time.sleep, random.random())
task.add_done_callback(markdone(queue, len(items)))
async def run(loop, n):
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
queue = asyncio.Queue()
# schedule the consumer
consumer_coro = consume(loop, queue, executor)
consumer = asyncio.ensure_future(consumer_coro)
# run the producer and wait for completion
await produce(queue, n)
# wait until the consumer has processed all items
await queue.join()
# the consumer is still awaiting for an item, cancel it
consumer.cancel()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop, 13))
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment