Skip to content

Instantly share code, notes, and snippets.

@mikeckennedy
Created February 23, 2022 01:03
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mikeckennedy/033ad92c165a9041fafc5c429e6c3c28 to your computer and use it in GitHub Desktop.
Save mikeckennedy/033ad92c165a9041fafc5c429e6c3c28 to your computer and use it in GitHub Desktop.
# requires the uvloop package
import asyncio
import threading
import time
import uuid
from typing import Any, Coroutine
import uvloop
uvloop.install()
initialized = False
__add_lock = threading.Lock()
__receive_lock = threading.Lock()
pending_items: dict[uuid.uuid4, Coroutine[Any, Any, Any]] = {}
finished_items: dict[uuid.uuid4, Any] = {}
def run(async_coroutine: Coroutine[Any, Any, Any]):
"""
Convert an async method to a synchronous one.
Example:
async def some_async_method(x, y): ...
result = syncify.run( some_async_method(1, 2) )
Args:
async_coroutine ():
Returns: The value returned by `async_coroutine`
"""
item_id = __add_work(async_coroutine)
while not __is_done(item_id):
time.sleep(0.0005)
continue
result = __get_result(item_id)
if isinstance(result, Exception):
raise SyncifyRuntimeError() from result
return result
class SyncifyRuntimeError(Exception):
pass
def worker_loop():
print(f"Starting syncify background thread.")
loop: uvloop.Loop = uvloop.new_event_loop()
while True:
with __add_lock:
count = len(pending_items)
if count == 0:
time.sleep(0.001)
continue
try:
with __add_lock:
work: list[(uuid.uuid4, Coroutine[Any, Any, Any])] = list(pending_items.items())
for k, w in work:
del pending_items[k]
running: dict[uuid.uuid4, asyncio.Task] = {
k: loop.create_task(w)
for k, w in work
}
for k, t in running.items():
try:
loop.run_until_complete(asyncio.wait([t]))
result = t.result()
with __receive_lock:
finished_items[k] = result
except Exception as x:
with __receive_lock:
finished_items[k] = x
except Exception as x:
print("Error processing pending tasks:")
print(x)
def __add_work(async_coroutine: Coroutine[Any, Any, Any]) -> uuid.uuid4:
new_id = uuid.uuid4()
with __add_lock:
pending_items[new_id] = async_coroutine
return new_id
def __is_done(item_id: uuid.uuid4) -> bool:
with __receive_lock:
return item_id in finished_items
def __get_result(item_id: uuid.uuid4) -> Any:
with __receive_lock:
result = finished_items[item_id]
del finished_items[item_id]
return result
worker_thread = threading.Thread(name="syncify-thread", target=worker_loop, daemon=True)
worker_thread.start()
@wshanks
Copy link

wshanks commented Mar 2, 2022

Neat. Why did you structure things to allow for multiple pending and completed items? It looks like run() is set up to run one thing at a time. Maybe it could be called from multiple threads? Is that safe with the loop.run_until_complete(asyncio.wait([t])) in the loop? I guess you shouldn't make coroutines interdependent.

@mikeckennedy
Copy link
Author

Hi Will. Thanks, I'm glad you like it. There were two ways to do this.

#1 I could have the loop grab a task and then run it, then grab the next task and then run that, and so on. That will basically make the async execution serial. Because in that form, there's only one active task at any moment. So that's not what we did.

#2 the other way, it's just start every single task and then start processing them as they finish. That's what we have above. Even though it's just one background thread, the async event loop does wet things happen in parallel as long as one of the coroutine awaits something. And if it's not awaiting anything, why make it a stink anyway right?

So that's why we have two loops and not just one. The first loop starts every single piece of work that we haven't gotten to yet. Then the second one just waits for them as they finish. Granted, we take them in the same order that they get finished. This is mostly because Python doesn't have a great way to check them for completion. But it doesn't really matter unless one task takes an extremely long period of time. In this world these are things like database calls web API calls and so on. They're in that milliseconds to less than one second. So they're usually flying. And ideally, during the slow parts we're letting the other code run because we are awaiting the database call or awaiting the web service call.

@wshanks
Copy link

wshanks commented Mar 7, 2022

Thanks for the response, Michael! I also heard you describe your usage a little more on the most recent Python Bytes episode.

From what you said there, it sounded like run() can be called from multiple threads. Is that right? Since run() takes a single task as argument and runs synchronously until the task is completed, I didn't see how you could get multiple tasks queued unless you were using threads. I also thought you mentioned nested calls to run() but I don't see how that could work. Wouldn't the outer call to run() block the worker loop unless you add a timeout to asyncio.wait()?

Also, looking at the code again, I think you could replace the sleeps with threading.Condition or threading.Event if you wanted.

@mikeckennedy
Copy link
Author

Hey Will. My used case is for web apps. And in most of the servers, they use threads. I think our uWSGI instances run 6 threads per worker process and 4-8 worker processes. So we definitely get multiple threads calling into these at a time. That's why we have to worry about it.

My original implementation didn't use this background thread idea. That was the case where the asyncio was reentered but probably would've worked out OK. but uWSGI he was doing all sorts of weirdness causing it to crash basically. So that's what got to the code above. Basically create one background thread who's job it is to manage the asyncio work. Think of it like an asyncio event scheduler and pool. It's job is to make sure each request runs to completion on a single stable thread.

You could extend this to multiple background threads working on these tasks in general. And then the pool making sure that the threads stick for a given asyncio request. But we just don't have that level of traffic, that each worker process is entirely swamped. Also I request a really fast like 2 ms, 5 ms, really slow ones are like 100 ms. So this thing clears itself out really fast in practice.

But you end up with reentrancy if you do something like this:

def validate_user(email):
    return syncify.run(validate_user_async(email))

async def validate_user_async(email):
    user = await get_user_by_email()
    orders = get_orders_for_user(user)
    return len(orders) > 0 if orders else False

def get_orders_for_user(email):
    return syncify.run(get_orders_for_user_async(email))

async def get_orders_for_user_async(user):
    ...

See how validate_user_async() calls the sync version instead of the async version of get_orders_for_user() (wrongly but not obviously so)? That was killing the plain old asyncio.run_to_completion() version as well as this version. That's what I wanted to check for and just report an error saying basically, just use await get_orders_for_user_async() instead, like you should anyway.

@wshanks
Copy link

wshanks commented Mar 7, 2022

Thanks, Michael. That all makes sense to me. I haven't thought about mixing asyncio and threading too much. For the reentrancy check, I think you could do something like storing threading.get_ident() in an active_threads set and raise an exception if it already is there when __add_work tries to insert a new item into pending_items since a given thread ID should only be running one task at a time.

@mikeckennedy
Copy link
Author

It is a weird mix isn't it. This is exactly what I was thinking. Rather than using some kind of set active_threads, I was planning on using active_thread as a thread local with threading.local see this article.

@wshanks
Copy link

wshanks commented Mar 20, 2022

Ah, that's even better and explains why that was a topic on a recent Python Bytes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment