Skip to content

Instantly share code, notes, and snippets.

@zzzeek
Last active January 22, 2024 15:35
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zzzeek/6287e28054d3baddc07fa21a7227904e to your computer and use it in GitHub Desktop.
Save zzzeek/6287e28054d3baddc07fa21a7227904e to your computer and use it in GitHub Desktop.
Write a program in asyncio, that calls into a library that knows nothing about asyncio, which then calls out to a database adapter that is only asyncio
"""Proof of concept of an adapter that will wrap a blocking IO library in
greenlets, so that it can be invoked by asyncio code and itself make calls out
to an asyncio database library.
hint: any Python ORM or database abstraction tool written against the DBAPI
could in theory allow asyncio round trips to a pure async DB driver like
asyncpg.
The approach here seems too simple to be true and I did not expect it to
collapse down to something this minimal, so it is very possible I am totally
missing something here. it's 1 AM and maybe I'm staring at it for too long
but it works?
note this is not using gevent or eventlet, we don't actually need those here
as we are using regular asyncio networking. It uses only the underlying
greenlet library which allows us to context switch without the
"await" keyword or other asyncio syntax.
"""
import asyncio
import contextvars
import sys
import asyncpg
import greenlet
class AsyncConnection:
"""An asyncio database connection class.
This is a pure asyncio class that has to be run in an
explicit async event loop.
Uses asyncpg.
"""
@classmethod
async def create(self, **kw):
conn = AsyncConnection.__new__(AsyncConnection)
conn.conn = await asyncpg.connect(**kw)
return conn
async def execute(self, statement):
await self.conn.execute(statement)
async def fetch(self, statement):
return await self.conn.fetch(statement)
async def close(self):
await self.conn.close()
class SyncConnection:
"""A synchronous database connection class.
This class mirrors the asyncpg API above however does not use asyncio at
all, all methods are written in blocking style.
This class is fairly analogous to the Connection in SQLAlchemy or other
ORMs. For a more DBAPI/SQLAlcheny-like approach we'd need to make more of
a "cursor" effect from asyncpg and make sure it starts a transaction, etc.
but in general this would be the SQLAlchemy Connection class, or the ORM
Session (which in SQLAlchemy 2.0 is heavily based on Session.execute()).
"""
@classmethod
def create(self, impl, **kw):
conn = SyncConnection.__new__(SyncConnection)
conn.conn = impl.create(**kw)
return conn
def execute(self, statement):
self.conn.execute(statement)
def fetch(self, statement):
return self.conn.fetch(statement)
def close(self):
self.conn.close()
class SyncAdaptImpl:
"""An adapter that translates between the async and sync versions
of the Connection.
This class can likely be generated programmatically for a larger
scale application as this is basically "add the async keyword to
each function call" boilerplate.
"""
@classmethod
def create(cls, **kw):
conn = SyncAdaptImpl.__new__(SyncAdaptImpl)
async def async_create(**kw):
return await AsyncConnection.create(
user="scott",
password="tiger",
host="localhost",
database="test",
)
conn.async_conn = greenlet_runner.run_in_asyncio(async_create(**kw))
return conn
def execute(self, statement):
async def execute(statement):
await self.async_conn.execute(statement)
return greenlet_runner.run_in_asyncio(execute(statement))
def fetch(self, statement):
async def fetch(statement):
return await self.async_conn.fetch(statement)
return greenlet_runner.run_in_asyncio(fetch(statement))
def close(self):
async def close():
await self.async_conn.close()
return greenlet_runner.run_in_asyncio(close())
def im_a_greenlet():
"""our greenlet-style function.
All the code here is written against the SyncConnection in blocking style
and this code knows nothing about asyncio. It is written for blocking
database connections, however we will run it in a greenlet container that
allows it to talk to asyncio database connections.
This would apply towards all the major chunks of the ORM that would
be insane to rewrite just to have "async" keywords everywhere, namely
the logic used by the unit of work to flush objects, and possibly
code like eager loaders as that run inline with loading as well.
"""
conn = SyncConnection.create(
SyncAdaptImpl,
user="scott",
password="tiger",
host="localhost",
database="test",
)
conn.execute("drop table if exists mytable")
conn.execute(
"create table if not exists "
"mytable (id serial primary key, data varchar)"
)
conn.execute("insert into mytable(data) values ('even more data')")
result = conn.fetch("select id, data from mytable")
conn.close()
return result
class GreenletExecutor:
"""Greenlet exector that wires up greenlet.switch() and await calls
together.
I'm sort of not really sure why this even works and I have not tested
this under real concurrency, this seems to simple to be true actually,
I first wrote this using a more complicated approach with two separate
event loops, but I'd prefer to have things run as a straight shot.
If this effect is actually this simple and there's no spooky weird
thing that is going to happen from adding stackless Python context
switches into async / await, this would be kind of amazing?
"""
current_return = contextvars.ContextVar("current greenlet context")
def run_in_asyncio(self, coroutine):
return self.current_return.get().switch(coroutine)
async def run_greenlet(self, fn):
result_future = asyncio.Future()
def run_greenlet_target():
result_future.set_result(fn())
return None
async def run_greenlet():
gl = greenlet.greenlet(run_greenlet_target)
greenlet_coroutine = gl.switch()
while greenlet_coroutine is not None:
task = asyncio.create_task(greenlet_coroutine)
try:
await task
except:
# this allows an exception to be raised within
# the moderated greenlet so that it can continue
# its expected flow.
greenlet_coroutine = gl.throw(*sys.exc_info())
else:
greenlet_coroutine = gl.switch(task.result())
current_return = greenlet.greenlet(run_greenlet)
self.current_return.set(current_return)
await current_return.switch()
return result_future.result()
greenlet_runner = GreenletExecutor()
if __name__ == "__main__":
# finally here's our program, written as asyncio. we want to use
# asyncio to write our app, but be able to call into a library
# that knows nothing about asyncio, so we will adapt it to run
# as a greenlet. the trick is that the database library ultimately
# used is also asyncio, so we don't need gevent or eventlet networking
# or even their event loops, only the greenlet construct.
loop = asyncio.get_event_loop()
async def run_our_asyncio_program():
retval = await greenlet_runner.run_greenlet(im_a_greenlet)
print(retval)
loop.run_until_complete(run_our_asyncio_program())
@vytas7
Copy link

vytas7 commented Jul 6, 2020

Nice! (Even though greenlet/gevent are considered sort of heresy...)

I've been recently thinking to myself whether it could be worth resurrecting and cleaning up the opposite too (there were some functional attempts out there) -- primarily running a gevent event loop (aka Hub in the gevent parlance), and shimming a fake async loop atop of that (or many microloops). That would allow mixing in async libraries with battle-tested gevent-based services.

@zzzeek
Copy link
Author

zzzeek commented Jul 6, 2020

there are libraries like that, e.g. to allow operability between asyncio and gevent, here are the links that I found for that kind of thing which helped me to come up with this approach:

top -level gevent discussion: gevent/gevent#982

clone of @vstinner 's aiogevent: https://github.com/2mf/aiogevent

tulipcore: https://github.com/decentfox/tulipcore

The important thing to note is that gevent has its own event loop. So you either have to get the asyncio event loop in there (what tulipcore does) or you have to pass messages between two event loops which is probably not very performant at high volume (basically this is what threadpoolexecutor has to do also).

Above, I had no interest in gevent's event loop or any of its networking functions, I just wanted to use asyncio and not have to use async/await keywords within a middle library. Someone has actually done this already, albeit in a more complicated style that tries to make it a bit more transparent than how I am doing it, this is at https://github.com/oremanj/greenback .

@vytas7
Copy link

vytas7 commented Jul 6, 2020

Aye, I had the same Gevent discussion in mind. FWIW, aiogevent would not work on a modern CPython version without major refactoring (it's even trying to call asyncio.async which is not quite the correct syntax anymore...).

I just wanted to use asyncio and not have to use async/await keywords within a middle library.

Indeed, that's one of the major annoyances for reusing code in libraries supporting the both worlds. If I were to decide, the idea should have not been rejected from the standard lib in the first place 👿 (I understand Guido's argument against reentrance and the resulting uncontrolled recursion, but the interpreter itself could offer support for switching contexts).

Your solution is admittedly really elegant in its simplicity 👍 Are you planning to release it as a separate microlibrary?

@zzzeek
Copy link
Author

zzzeek commented Jul 6, 2020

oh that's a great link, I have a vague notion that Guido has rejected all the very reasonable approaches such that asyncio doesnt force all library authors everywhere to rewrite everything fron scratch, but this seems to be where it actually happened.

re: microlibrary, I think at the moment this would be a utility wihtin SQLAlchemy itself but I don't have an appetite to maintain /release a separate package. Also considering "greenback" already does this I wouldn't want to compete with that.

that said, consider this code to be public domain and go nuts with it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment