Created
April 30, 2019 14:19
-
-
Save leotada/37057c4b53a36ecb8d2405754238ac3d to your computer and use it in GitHub Desktop.
Using sync function in async loop in Python, with threads.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import asyncio | |
import functools | |
import os | |
import threading | |
from concurrent.futures import ThreadPoolExecutor | |
contextvars = None | |
class SyncToAsync: | |
""" | |
Utility class which turns a synchronous callable into an awaitable that | |
runs in a threadpool. It also sets a threadlocal inside the thread so | |
calls to AsyncToSync can escape it. | |
""" | |
# If they've set ASGI_THREADS, update the default asyncio executor for now | |
if "ASGI_THREADS" in os.environ: | |
loop = asyncio.get_event_loop() | |
loop.set_default_executor( | |
ThreadPoolExecutor(max_workers=int(os.environ["ASGI_THREADS"])) | |
) | |
threadlocal = threading.local() | |
def __init__(self, func): | |
self.func = func | |
async def __call__(self, *args, **kwargs): | |
loop = asyncio.get_event_loop() | |
if contextvars is not None: | |
context = contextvars.copy_context() | |
child = functools.partial(self.func, *args, **kwargs) | |
func = context.run | |
args = (child,) | |
kwargs = {} | |
else: | |
func = self.func | |
future = loop.run_in_executor( | |
None, functools.partial(self.thread_handler, loop, func, *args, **kwargs) | |
) | |
return await asyncio.wait_for(future, timeout=None) | |
def __get__(self, parent, objtype): | |
""" | |
Include self for methods | |
""" | |
return functools.partial(self.__call__, parent) | |
def thread_handler(self, loop, func, *args, **kwargs): | |
""" | |
Wraps the sync application with exception handling. | |
""" | |
# Set the threadlocal for AsyncToSync | |
self.threadlocal.main_event_loop = loop | |
# Run the function | |
return func(*args, **kwargs) | |
def pause(t): | |
print('pausing..') | |
time.sleep(t) | |
print('\nend') | |
async def async_pause(t): | |
print('pausing async..') | |
await asyncio.sleep(t) | |
print('end async') | |
async def main(): | |
t = time.time() | |
p = SyncToAsync(pause) | |
await asyncio.gather(async_pause(3), async_pause(3), p(3)) | |
print(time.time()-t) | |
# Python 3.7 | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment