Skip to content

Instantly share code, notes, and snippets.

@valsteen
Last active November 27, 2022 10:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save valsteen/6989796b49be4dc102fed2fb08c05cf3 to your computer and use it in GitHub Desktop.
Save valsteen/6989796b49be4dc102fed2fb08c05cf3 to your computer and use it in GitHub Desktop.
Self-feeding task queue using python's asyncio
"""
This demonstrates a task scheduling loop where tasks can send back other tasks to the queue.
Program input: a nested list of items that can either be integers, whose processing is simulated by waiting
a proportional amount of time, or another list, in which case after waiting a time proportional to its length,
sub-items are rescheduled for processing.
Solution is inspired by goroutines, channels and waitgroups.
Asyncio's Queue can serve both as channel and waitgroup, thanks to Queue.join() that blocks until the queue is empty.
"""
import asyncio
from asyncio import Queue, create_task, sleep, wait, Task
from time import time
from typing import Any, Optional, Union
# 0 = unlimited
MAX_WORKERS = 5
TASK_TREE = [
1,
2,
[3, [4, 5, [6, 7]], 8],
[9, 10],
11,
12,
13,
[14, 15, [16, [17, 18, 19, [20, 21, 22]]]],
]
async def do_task(value: Union[int, list[Any]]) -> Optional[list[Any]]:
match value:
case int(value):
print(f">> processing final value {value} ...")
await sleep((25 - value))
print(f"<< finished processing final value {value} ...")
case list(items):
print(f"@@ got list of {len(items)}, rescheduling them")
await sleep(len(items))
return items
return None
async def work(item: int, queue: Queue[Optional[int]]):
if results := await do_task(item):
for result in results:
await queue.put(result)
queue.task_done()
async def worker(queue: Queue[Optional[int]]):
while item := await queue.get():
await work(item, queue)
async def cleanup(tasks: set[Task]):
def callback(task: Task):
tasks.discard(task)
return callback
async def unlimited_worker(queue: Queue[Optional[int]]):
# a reference to scheduled tasks must be preserved to ensure it completes
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
# "Save a reference to the result of this function, to avoid a task disappearing mid-execution."
tasks = set[Task]()
while item := await queue.get():
task = asyncio.create_task(work(item, queue))
task.add_done_callback(cleanup(tasks))
async def main():
start_time = time()
queue = Queue[Optional[int]]()
workers = []
if MAX_WORKERS > 0:
for _ in range(MAX_WORKERS):
workers.append(create_task(worker(queue)))
else:
workers.append(create_task(unlimited_worker(queue)))
for item in TASK_TREE:
await queue.put(item)
await queue.join()
for _ in range(MAX_WORKERS + 1): # add 1 to account to MAX_WORKERS = 0
await queue.put(None)
await wait(workers)
print(f"finished in {time() - start_time:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment