Skip to content

Instantly share code, notes, and snippets.

@wynsmart
Last active May 17, 2020 07:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wynsmart/fc4db88c9dbc832bf820b2ba73c483fb to your computer and use it in GitHub Desktop.
Save wynsmart/fc4db88c9dbc832bf820b2ba73c483fb to your computer and use it in GitHub Desktop.
AsyncIO Task Scheduler
#!/usr/bin/env python3
import asyncio
from typing import Coroutine, List
class AsyncScheduler:
@classmethod
def run(cls, tasks: List[Coroutine], limit: int = 3) -> None:
"""
Given a list of async function calls, execute it with limited concurrency
Example:
async def fn(x):
...
AsyncScheduler.run([fn(1), fn(2), fn(3)], limit=2)
"""
if len(tasks) == 0:
return
asyncio.run(cls.__run(tasks, limit))
@classmethod
async def __run(cls, tasks, limit) -> None:
total_tasks = len(tasks)
running = set()
while 1:
while len(tasks) and len(running) < limit:
running.add(asyncio.create_task(tasks.pop(0)))
utils.notice(
f"processing {len(running)} tasks, "
f"{len(tasks)} pending, "
f"{total_tasks - len(tasks) - len(running)} done, "
f"{total_tasks} total"
)
done, running = await asyncio.wait(
running, return_when=asyncio.FIRST_COMPLETED
)
if not len(running) and not len(tasks):
return
import asyncio
import time
from AsyncScheduler import AsyncScheduler
async def f(x):
print(f'f{x}')
await asyncio.sleep(x)
print(x, time.time())
tasks = [f(2), f(2), f(1), f(2)]
AsyncScheduler.run(tasks, 3)
tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment