Skip to content

Instantly share code, notes, and snippets.

@mikeckennedy
Created February 23, 2022 01:03
Show Gist options
  • Save mikeckennedy/033ad92c165a9041fafc5c429e6c3c28 to your computer and use it in GitHub Desktop.
Save mikeckennedy/033ad92c165a9041fafc5c429e6c3c28 to your computer and use it in GitHub Desktop.
# requires the uvloop package
import asyncio
import threading
import time
import uuid
from typing import Any, Coroutine
import uvloop
uvloop.install()
initialized = False
__add_lock = threading.Lock()
__receive_lock = threading.Lock()
pending_items: dict[uuid.uuid4, Coroutine[Any, Any, Any]] = {}
finished_items: dict[uuid.uuid4, Any] = {}
def run(async_coroutine: Coroutine[Any, Any, Any]):
"""
Convert an async method to a synchronous one.
Example:
async def some_async_method(x, y): ...
result = syncify.run( some_async_method(1, 2) )
Args:
async_coroutine ():
Returns: The value returned by `async_coroutine`
"""
item_id = __add_work(async_coroutine)
while not __is_done(item_id):
time.sleep(0.0005)
continue
result = __get_result(item_id)
if isinstance(result, Exception):
raise SyncifyRuntimeError() from result
return result
class SyncifyRuntimeError(Exception):
pass
def worker_loop():
print(f"Starting syncify background thread.")
loop: uvloop.Loop = uvloop.new_event_loop()
while True:
with __add_lock:
count = len(pending_items)
if count == 0:
time.sleep(0.001)
continue
try:
with __add_lock:
work: list[(uuid.uuid4, Coroutine[Any, Any, Any])] = list(pending_items.items())
for k, w in work:
del pending_items[k]
running: dict[uuid.uuid4, asyncio.Task] = {
k: loop.create_task(w)
for k, w in work
}
for k, t in running.items():
try:
loop.run_until_complete(asyncio.wait([t]))
result = t.result()
with __receive_lock:
finished_items[k] = result
except Exception as x:
with __receive_lock:
finished_items[k] = x
except Exception as x:
print("Error processing pending tasks:")
print(x)
def __add_work(async_coroutine: Coroutine[Any, Any, Any]) -> uuid.uuid4:
new_id = uuid.uuid4()
with __add_lock:
pending_items[new_id] = async_coroutine
return new_id
def __is_done(item_id: uuid.uuid4) -> bool:
with __receive_lock:
return item_id in finished_items
def __get_result(item_id: uuid.uuid4) -> Any:
with __receive_lock:
result = finished_items[item_id]
del finished_items[item_id]
return result
worker_thread = threading.Thread(name="syncify-thread", target=worker_loop, daemon=True)
worker_thread.start()
@mikeckennedy
Copy link
Author

It is a weird mix isn't it. This is exactly what I was thinking. Rather than using some kind of set active_threads, I was planning on using active_thread as a thread local with threading.local see this article.

@wshanks
Copy link

wshanks commented Mar 20, 2022

Ah, that's even better and explains why that was a topic on a recent Python Bytes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment