Skip to content

Instantly share code, notes, and snippets.

View Guiforge's full-sized avatar

Guillaume Pouyat Guiforge

  • CyberVadis
  • Paris
View GitHub Profile
@Guiforge
Guiforge / gather_limit.py
Last active May 17, 2024 09:42
asyncio.gather with max number of concurrent tasks.
import asyncio
from collections.abc import Coroutine
from typing import Any
async def gather_limit(
*tasks: Coroutine[None, None, Any],
return_exceptions: bool = False,
max_con: int = 100,
) -> Any:
@Guiforge
Guiforge / wait.py
Last active August 8, 2023 05:58
Synchronous waiting for an asynchronous coroutine with Type
import asyncio
from collections.abc import Coroutine
from typing import Any, TypeVar
T = TypeVar("T")
def swait(awaitable: Coroutine[Any, Any, T]) -> T:
"""Wait but for in syncronous function.