Skip to content

Instantly share code, notes, and snippets.

@JeroenPeterBos
Last active April 5, 2024 08:23
Show Gist options
  • Save JeroenPeterBos/26655ff6783cfa14b7d2a76e43b09ed3 to your computer and use it in GitHub Desktop.
Save JeroenPeterBos/26655ff6783cfa14b7d2a76e43b09ed3 to your computer and use it in GitHub Desktop.
Run synchronous code inside asynchronous context without blocking the event loop
import asyncio
from asyncio import AbstractEventLoop
from concurrent.futures import Executor
import functools
from typing import Any, Callable, Coroutine, TypeVar
R = TypeVar("R")
async def run_in_executor(
func: Callable[..., R],
*args: Any,
loop: AbstractEventLoop = None,
executor: Executor = None,
**kwargs,
) -> R:
"""Asynchronously executes a given synchronous function in the specified executor.
This function is a utility to facilitate running synchronous (blocking) functions
in an asynchronous manner, without blocking the asyncio event loop. It is particularly
useful for IO-bound operations or CPU-bound tasks that would otherwise block the async
execution, allowing them to be executed in a separate thread or process pool.
Parameters:
func (Callable[..., R]): The synchronous function to be executed. It can accept
any number of positional and keyword arguments.
*args (Any): Positional arguments to be passed to the function `func`.
loop (asyncio.AbstractEventLoop, optional): The asyncio event loop to use for
scheduling the execution. If `None`, the currently running event loop
is used. Defaults to None.
executor (Executor, optional): The executor in which to run the given function.
This can be an instance of `ThreadPoolExecutor`, `ProcessPoolExecutor`,
or any other executor implementing the `Executor` interface. If `None`,
the default executor is used. Defaults to None.
**kwargs (Any): Keyword arguments to be passed to the function `func`.
Returns:
R: The return value of the function `func`. The exact type depends on the return
type of `func`.
Example:
>>> import time
>>> def sync_sleep(duration, message):
... time.sleep(duration)
... return message
>>> async def async_sleep_and_print(duration, message):
... result = await run_in_executor(sync_sleep, duration, message=message)
... print(result)
>>> # In an asyncio environment:
>>> # asyncio.run(async_sleep_and_print(1, 'Hello, World!'))
"""
if loop is None:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, lambda: func(*args, **kwargs))
async def run_in_executor_wrap(
executor: Executor = None,
loop: AbstractEventLoop = None,
):
"""Decorator to run a synchronous function in a specific executor.
This decorator transforms a blocking function into an asynchronous function by running it
in a separate thread or process pool. It is particularly useful for IO-bound operations or
for executing CPU-bound tasks asynchronously in a thread pool, thereby avoiding blocking
the async event loop.
Args:
executor (Executor, optional): The executor in which to run the given function.
This can be a `ThreadPoolExecutor`, `ProcessPoolExecutor`, or any custom executor that implements the `Executor` interface. Defaults to None, in which case the default ThreadPoolExecutor is used.
loop (asyncio.AbstractEventLoop, optional): The event loop to use for asynchronous execution.
If None, the current running event loop is used. Defaults to None.
Returns:
Callable[..., Coroutine[Any, Any, R]]: A decorator that transforms the given synchronous function into an asynchronous function, allowing it to be awaited.
Example:
>>> import time
>>> def blocking_operation(x, y):
... time.sleep(1)
... return x + y
>>> @run_in_executor_wrap()
... async def async_add(x, y):
... return blocking_operation(x, y)
>>> # async context is required to call async_add
>>> # result = await async_add(1, 2)
"""
def decorator(func: Callable[..., R]) -> Callable[..., Coroutine[Any, Any, R]]:
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> R:
return await run_in_executor(func, *args, loop=loop, executor=executor, **kwargs)
return wrapper
return decorator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment