Skip to content

Instantly share code, notes, and snippets.

@amaciel81
Last active March 8, 2021 07:57
Show Gist options
  • Save amaciel81/c7917e014a65f83fbb0125128a21247c to your computer and use it in GitHub Desktop.
Save amaciel81/c7917e014a65f83fbb0125128a21247c to your computer and use it in GitHub Desktop.
import asyncio
from typing import Any, List, Callable, Coroutine
async def async_io_worker(time_sec: int) -> str:
await asyncio.sleep(time_sec)
return f"Waited for {time_sec}s..."
async def async_scheduler(instances: int, worker: Coroutine, args: List[Any]) -> None:
coroutines = [worker(*args) for _ in range(instances)]
await asyncio.gather(*coroutines)
print("Coroutine Execution, I/O bound")
with timer():
asyncio.run(async_scheduler(8, async_io_worker, [3]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment