Skip to content

Instantly share code, notes, and snippets.

@stereobutter
Created November 17, 2021 18:21
Show Gist options
  • Save stereobutter/84f48a650391448635de406852bfe128 to your computer and use it in GitHub Desktop.
Save stereobutter/84f48a650391448635de406852bfe128 to your computer and use it in GitHub Desktop.
TrioLoop
import trio
from .trio_loop import TrioLoop
async def foo(data):
await trio.sleep(1)
return data
with TrioLoop() as loop:
data = loop.wait(foo('hello world'))
print(data)
import trio
from functools import partial
from contextlib import contextmanager
import outcome
import threading
async def _await(coro):
try:
return outcome.Value(await coro)
except Exception as error:
return outcome.Error(error)
class TrioLoop:
def __init__(self):
self._ctx = None
self._stop = trio.Event()
self._started = threading.Event()
@contextmanager
def _enter_exit(self):
trio.lowlevel.start_thread_soon(partial(trio.run, self._main), lambda _: None)
try:
self._started.wait()
yield self
finally:
self._token.run_sync_soon(self._stop.set)
def __enter__(self):
if self._ctx:
raise RuntimeError('TrioLoop cannot be re-entered')
self._ctx = self._enter_exit()
return self._ctx.__enter__()
def __exit__(self, et, ev, tb):
if not self._ctx:
raise RuntimeError('TrioLoop has not been entered')
return self._ctx.__exit__(et, ev, tb)
async def _main(self):
self._token = trio.lowlevel.current_trio_token()
self._started.set()
await self._stop.wait()
def wait(self, coro):
result = trio.from_thread.run(_await, coro, trio_token=self._token)
return result.unwrap()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment