Skip to content

Instantly share code, notes, and snippets.

@snowwm
Last active March 29, 2024 07:43
Show Gist options
  • Save snowwm/df3a0c109001fec0a778b895811a69fc to your computer and use it in GitHub Desktop.
Save snowwm/df3a0c109001fec0a778b895811a69fc to your computer and use it in GitHub Desktop.
A dirty hack useful if you need to quickly integrate sync and async codebases (or start making some parts async). Not for production!
"""
This example demonstrates using anysync with httpx.
"""
import logging
from anysync import anysync
from anysync.httpx_patch import Client
logging.basicConfig(level=logging.INFO)
client = Client()
@anysync
async def test():
test1()
await test2()
test1()
def test1():
print("Calling async -> sync -> async")
# When we get to this, test() is running in one worker thread and we need
# to launch my_ip() in another one. On the first call of test1()
# a new thread will be created, and on the second call it will be reused.
# You can see it in the log.
my_ip()
async def test2():
print("Calling async -> async -> async")
# Here no intervention is needed, no threads will be launched.
await my_ip()
@anysync
async def my_ip():
res = await client.get("http://ip-api.com/line/?fields=8192")
print(f"My IP is {res.text.strip()}")
# Just call it. No need to care whether it is async.
# This will create a worker thread and show it in the log.
test()
"""
This module exports a decorator `anysync` that can be used to transparently
call async functions from sync context. Moreover, you can arbitrarily mix and
nest sync and async function calls.
Under the hood it uses a pool of threads each of which runs an event loop.
Beware that it will deadlock when the pool becomes full (though it's unlikely).
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
import functools
import inspect
import logging
__all__ = ["anysync"]
def _init_thread():
_logger.info("New async worker thread")
asyncio.set_event_loop(asyncio.new_event_loop())
def _exec_async(future):
_logger.info("Entering async thread")
return asyncio.get_event_loop().run_until_complete(future)
def anysync(func):
@functools.wraps(func)
def inner(*args, **kwargs):
f = inspect.currentframe().f_back.f_code.co_flags
if f & (inspect.CO_COROUTINE | inspect.CO_ASYNC_GENERATOR):
# No need to interfere if the caller is already async.
return func(*args, **kwargs)
# Run in the thread pool and synchronously wait for result.
future = _executor.submit(_exec_async, func(*args, **kwargs))
return future.result()
return inner
_logger = logging.getLogger(__name__)
_executor = ThreadPoolExecutor(initializer=_init_thread)
anysync.shutdown = _executor.shutdown
"""
This module defines a patched httpx AsyncClient which can be transparently used
by sync functions. This requires some tricks because the same AsyncTransport
cannot be used in different asyncio event loops. We overcome this by creating
a new copy of transport for each loop in which it's about to be used. However,
it's difficult to properly close this copies (I suppose each must be closed in
its own event loop), so for now they are simply not being closed.
"""
import asyncio
from collections import defaultdict
from functools import partial
import httpx
from . import anysync
__all__ = ["Client"]
class Client(httpx.AsyncClient):
def _init_transport(self, *args, **kwargs):
constructor = partial(super()._init_transport, *args, **kwargs)
tr = constructor()
tr._patch_constructor = constructor
return tr
def _init_proxy_transport(self, *args, **kwargs):
constructor = partial(super()._init_proxy_transport, *args, **kwargs)
tr = constructor()
tr._patch_constructor = constructor
return tr
def _transport_for_url(self, url):
cur_loop = asyncio.get_running_loop()
tr = super()._transport_for_url(url)
if getattr(tr, "_patch_copies_by_loop", None) is None:
tr._patch_copies_by_loop = defaultdict(tr._patch_constructor)
tr._patch_copies_by_loop[cur_loop] = tr
return tr._patch_copies_by_loop[cur_loop]
Client.request = anysync(Client.request)
Client.stream = anysync(Client.stream)
Client.send = anysync(Client.send)
Client.get = anysync(Client.get)
Client.options = anysync(Client.options)
Client.head = anysync(Client.head)
Client.post = anysync(Client.post)
Client.put = anysync(Client.put)
Client.patch = anysync(Client.patch)
Client.delete = anysync(Client.delete)
Client.close = anysync(Client.aclose)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment