Last active
January 29, 2021 10:33
-
-
Save glenfant/24dc52bb3fe2c39d20c14369a86c43bf to your computer and use it in GitHub Desktop.
[asyncio] Simple recipe that shows how to wrap a blocking callable in an awaitable coroutine. Requires Python 3.7+
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Running a blocking callable inside a coroutine | |
# ============================================== | |
import asyncio | |
import concurrent.futures | |
import functools | |
from typing import Callable, Tuple, Dict, Any | |
# Use the better suited pool (see doc of ``concurrent.future```) | |
#_executor = concurrent.futures.ThreadPoolExecutor() | |
#_executor = concurrent.futures.ProcessPoolExecutor() | |
_executor = None # Default asyncio pool | |
async def call_blocking(func: Callable, *args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any: | |
""" | |
Enable to "await" a blocking I/O callable from an asyncio coroutine | |
Args: | |
func: The regular blocking callable (function, method) | |
args: Positional arguments transmitted to ``func`` | |
kwargs: Keyword arguments transmitted to ``func`` | |
Returns: | |
Anything that's returned by ``func`` | |
""" | |
loop = asyncio.get_running_loop() | |
new_func = functools.partial(func, *args, **kwargs) | |
result = await loop.run_in_executor(_executor, new_func) | |
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if __name__ == "__main__": | |
import time | |
from call_blocking import call_blocking | |
def multiply(value, multiplier=2): | |
# A blocking function | |
time.sleep(0.1) | |
result = value * multiplier | |
time.sleep(0.1) | |
return result | |
class Operations: | |
def instmultiply(self, value, multiplier=2): | |
return self.clsmultiply(value, multiplier) | |
@classmethod | |
def clsmultiply(cls, value, multiplier=2): | |
return cls.statmultiply(value, multiplier) | |
@staticmethod | |
def statmultiply(value, multiplier=2): | |
return multiply(value, multiplier) | |
async def test(): | |
print("Blocking func") | |
result = await call_blocking(multiply, 2, multiplier=3) | |
print(result) | |
ops = Operations() | |
print("Blocking static method") | |
result = await call_blocking(ops.statmultiply, 3) | |
print(result) | |
print("Blocking class method") | |
result = await call_blocking(ops.clsmultiply, 3) | |
print(result) | |
print("Blocking instance method") | |
result = await call_blocking(ops.instmultiply, 3) | |
print(result) | |
asyncio.run(test()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment