Skip to content

Instantly share code, notes, and snippets.

@Lonami
Last active January 3, 2024 12:47
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Lonami/3f79ed774d2e0100ded5b171a47f2caf to your computer and use it in GitHub Desktop.
Save Lonami/3f79ed774d2e0100ded5b171a47f2caf to your computer and use it in GitHub Desktop.
Wrapper to launch async tasks from threaded code
import threading
import asyncio
from queue import Queue as BlockingQueue
class TwoSidedQueue:
"""
Behaves like an `asyncio.Queue`, but `get` and `put` act on different ends.
"""
def __init__(self, queue_in, queue_out):
self._queue_in = queue_in
self._queue_out = queue_out
self._sides = {
'empty': queue_out,
'full': queue_out,
'get': queue_in,
'get_nowait': queue_in,
'join': queue_out,
'put': queue_out,
'put_nowait': queue_out,
'qsize': queue_out,
'task_done': queue_in,
}
def __getattr__(self, name):
return getattr(self._sides.get(name, self._queue_in), name)
class LaunchAsync:
"""
Provides a way to safely `put` data into `coro` (an `async def`) and `get` data back.
It will launch the `coro` in a new thread, so be careful it doesn't crash.
Make sure this `coro` does not interact with a `coro` launched from another instance
of `LaunchAsync`, since the `asyncio` event loops won't match and could lead to errors.
When the context manager exits, the task will be cancelled. You can catch this error in
`coro`, but if the `coro` doesn't eventually exit, the context manager exit will hang.
"""
def __init__(self, coro, *args, **kwargs):
self._coro = coro
self._args = args
self._kwargs = kwargs
self._thread = None
self._loop = None
self._task = None
self._queue_in = None
self._queue_out = None
self._size = 0
def size(self, size):
"""
Change the size of the inbound and outbound queues. The default is unbounded (`None`). Example:
>>> with LaunchAsync(async_def).size(10) as queue:
>>> ... # ^^^^^^^^^
"""
self._size = size or 0
return self
def put(self, data, *, timeout=None):
"""
`put` data in for the `coro` to `get` out. Will block if the maximum `size` was reached.
Does nothing if the `coro` is dead.
"""
try:
return asyncio.run_coroutine_threadsafe(self._queue_out.put(data), self._loop).result(timeout)
except RuntimeError:
if self._loop.is_running():
raise
else:
return None
def get(self, *, timeout=None):
"""
`get` data out of the `coro` it `put` in. Will block if the queue is empty.
Returns `None` if the `coro` is dead.
"""
try:
return asyncio.run_coroutine_threadsafe(self._queue_in.get(), self._loop).result(timeout)
except RuntimeError:
if self._loop.is_running():
raise
else:
return None
def dead(self):
"""
Return `true` if the other side is dead (the `coro` has exited, with or without error).
"""
return not self._loop.is_running()
def __enter__(self):
# asyncio.run is used as it's a battle-tested way to safely set up a new loop and tear
# it down. However it does mean it's necessary to wait for the task to run before it's
# possible to get said loop and task back. For this, the usual blocking queue is used.
oneshot = BlockingQueue(1)
self._thread = threading.Thread(target=asyncio.run, args=(
self._run(self._coro, self._size, oneshot, self._args, self._kwargs),))
self._thread.start()
self._loop, self._task, self._queue_in, self._queue_out = oneshot.get()
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
try:
self._loop.call_soon_threadsafe(self._task.cancel)
except RuntimeError:
if self._loop.is_running():
raise
finally:
self._thread.join()
@staticmethod
async def _run(coro, size, oneshot, args, kwargs):
# asyncio.Queue's are created here so that they pick up the right loop.
queue_in, queue_out = asyncio.Queue(size), asyncio.Queue(size)
oneshot.put((asyncio.get_event_loop(), asyncio.current_task(), queue_in, queue_out))
try:
# `queue_in` and `queue_out` are intentionally swapped here.
await coro(TwoSidedQueue(queue_out, queue_in), *args, **kwargs)
except asyncio.CancelledError:
pass
# This is an example of what your other code can look like.
from launchasync import LaunchAsync
# You don't need a class to send commands. Your commands can be anything.
# You can send strings, numbers, and any other object you want directly.
# This is just a nice way to send commands with ID and optional data.
class Command:
def __init__(self, id, data=None):
self.id = id
self.data = data
# This is the "async main" function where you will do everything that needs async.
# If you need to call other async functions or create more tasks, do it inside here.
# `queue` acts like an `asyncio.Queue` but `put` and `get` won't interfere.
async def async_main(queue):
while True:
command = await queue.get()
if command.id == 'print':
print('Hello from async!')
elif command.id == 'double':
await queue.put(command.data * 2)
# The first argument must be a coroutine function (do NOT call it with ()).
# In this case, it's `async_main` (without `()`).
#
# The function passed here will always receive the shared queue as the first parameter.
# You can pass more parameters in `LaunchAsync` to forward them to your `async def`.
#
# LaunchAsync is a context manager which gives you access to a shared queue with the
# async code.
with LaunchAsync(async_main) as queue:
# Here's your threaded code. You can `put` commands to execute in async-land.
# You can even launch a new thread and `put` things from that new thread.
queue.put(Command('print'))
# And you can also `get` commands back (note that this can block, but you can set
# `timeout` in both `put` and `get`. It only makes sense if `LaunchAsync.size` was used).
queue.put(Command('double', 7))
response = queue.get(timeout=1)
print('The result of doubling 7 is', response)
# Here the `async_main` task will be cancelled automatically and the thread joined.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment