Last active
April 5, 2024 08:23
-
-
Save JeroenPeterBos/26655ff6783cfa14b7d2a76e43b09ed3 to your computer and use it in GitHub Desktop.
Run synchronous code inside asynchronous context without blocking the event loop
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from asyncio import AbstractEventLoop | |
from concurrent.futures import Executor | |
import functools | |
from typing import Any, Callable, Coroutine, TypeVar | |
R = TypeVar("R") | |
async def run_in_executor( | |
func: Callable[..., R], | |
*args: Any, | |
loop: AbstractEventLoop = None, | |
executor: Executor = None, | |
**kwargs, | |
) -> R: | |
"""Asynchronously executes a given synchronous function in the specified executor. | |
This function is a utility to facilitate running synchronous (blocking) functions | |
in an asynchronous manner, without blocking the asyncio event loop. It is particularly | |
useful for IO-bound operations or CPU-bound tasks that would otherwise block the async | |
execution, allowing them to be executed in a separate thread or process pool. | |
Parameters: | |
func (Callable[..., R]): The synchronous function to be executed. It can accept | |
any number of positional and keyword arguments. | |
*args (Any): Positional arguments to be passed to the function `func`. | |
loop (asyncio.AbstractEventLoop, optional): The asyncio event loop to use for | |
scheduling the execution. If `None`, the currently running event loop | |
is used. Defaults to None. | |
executor (Executor, optional): The executor in which to run the given function. | |
This can be an instance of `ThreadPoolExecutor`, `ProcessPoolExecutor`, | |
or any other executor implementing the `Executor` interface. If `None`, | |
the default executor is used. Defaults to None. | |
**kwargs (Any): Keyword arguments to be passed to the function `func`. | |
Returns: | |
R: The return value of the function `func`. The exact type depends on the return | |
type of `func`. | |
Example: | |
>>> import time | |
>>> def sync_sleep(duration, message): | |
... time.sleep(duration) | |
... return message | |
>>> async def async_sleep_and_print(duration, message): | |
... result = await run_in_executor(sync_sleep, duration, message=message) | |
... print(result) | |
>>> # In an asyncio environment: | |
>>> # asyncio.run(async_sleep_and_print(1, 'Hello, World!')) | |
""" | |
if loop is None: | |
loop = asyncio.get_running_loop() | |
return await loop.run_in_executor(executor, lambda: func(*args, **kwargs)) | |
async def run_in_executor_wrap( | |
executor: Executor = None, | |
loop: AbstractEventLoop = None, | |
): | |
"""Decorator to run a synchronous function in a specific executor. | |
This decorator transforms a blocking function into an asynchronous function by running it | |
in a separate thread or process pool. It is particularly useful for IO-bound operations or | |
for executing CPU-bound tasks asynchronously in a thread pool, thereby avoiding blocking | |
the async event loop. | |
Args: | |
executor (Executor, optional): The executor in which to run the given function. | |
This can be a `ThreadPoolExecutor`, `ProcessPoolExecutor`, or any custom executor that implements the `Executor` interface. Defaults to None, in which case the default ThreadPoolExecutor is used. | |
loop (asyncio.AbstractEventLoop, optional): The event loop to use for asynchronous execution. | |
If None, the current running event loop is used. Defaults to None. | |
Returns: | |
Callable[..., Coroutine[Any, Any, R]]: A decorator that transforms the given synchronous function into an asynchronous function, allowing it to be awaited. | |
Example: | |
>>> import time | |
>>> def blocking_operation(x, y): | |
... time.sleep(1) | |
... return x + y | |
>>> @run_in_executor_wrap() | |
... async def async_add(x, y): | |
... return blocking_operation(x, y) | |
>>> # async context is required to call async_add | |
>>> # result = await async_add(1, 2) | |
""" | |
def decorator(func: Callable[..., R]) -> Callable[..., Coroutine[Any, Any, R]]: | |
@functools.wraps(func) | |
async def wrapper(*args: Any, **kwargs: Any) -> R: | |
return await run_in_executor(func, *args, loop=loop, executor=executor, **kwargs) | |
return wrapper | |
return decorator |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment