Skip to content

Instantly share code, notes, and snippets.

@leotada
Created April 30, 2019 14:19
Show Gist options
  • Save leotada/37057c4b53a36ecb8d2405754238ac3d to your computer and use it in GitHub Desktop.
Save leotada/37057c4b53a36ecb8d2405754238ac3d to your computer and use it in GitHub Desktop.
Using sync function in async loop in Python, with threads.
import time
import asyncio
import functools
import os
import threading
from concurrent.futures import ThreadPoolExecutor
contextvars = None
class SyncToAsync:
"""
Utility class which turns a synchronous callable into an awaitable that
runs in a threadpool. It also sets a threadlocal inside the thread so
calls to AsyncToSync can escape it.
"""
# If they've set ASGI_THREADS, update the default asyncio executor for now
if "ASGI_THREADS" in os.environ:
loop = asyncio.get_event_loop()
loop.set_default_executor(
ThreadPoolExecutor(max_workers=int(os.environ["ASGI_THREADS"]))
)
threadlocal = threading.local()
def __init__(self, func):
self.func = func
async def __call__(self, *args, **kwargs):
loop = asyncio.get_event_loop()
if contextvars is not None:
context = contextvars.copy_context()
child = functools.partial(self.func, *args, **kwargs)
func = context.run
args = (child,)
kwargs = {}
else:
func = self.func
future = loop.run_in_executor(
None, functools.partial(self.thread_handler, loop, func, *args, **kwargs)
)
return await asyncio.wait_for(future, timeout=None)
def __get__(self, parent, objtype):
"""
Include self for methods
"""
return functools.partial(self.__call__, parent)
def thread_handler(self, loop, func, *args, **kwargs):
"""
Wraps the sync application with exception handling.
"""
# Set the threadlocal for AsyncToSync
self.threadlocal.main_event_loop = loop
# Run the function
return func(*args, **kwargs)
def pause(t):
print('pausing..')
time.sleep(t)
print('\nend')
async def async_pause(t):
print('pausing async..')
await asyncio.sleep(t)
print('end async')
async def main():
t = time.time()
p = SyncToAsync(pause)
await asyncio.gather(async_pause(3), async_pause(3), p(3))
print(time.time()-t)
# Python 3.7
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment