Skip to content

Instantly share code, notes, and snippets.

@miracle2k
Created July 27, 2018 12:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miracle2k/8499df40a7b650198bbbc3038a6fb292 to your computer and use it in GitHub Desktop.
Save miracle2k/8499df40a7b650198bbbc3038a6fb292 to your computer and use it in GitHub Desktop.
Trio nursery that can spawn tasks in 'daemon' and 'main' modes.
async with open_special_nursery() as nursery:
nursery.start_soon(self.wait_for_stop_signal, daemon=True)
nursery.start_soon(func, *a)
from contextlib import asynccontextmanager
import trio
class SpecialNursery:
def __init__(self, nursery):
self.nursery = nursery
self._non_daemon_tasks = set()
self._daemon_scopes = set()
self._no_more_non_daemon_tasks = trio.Event()
self._no_more_non_daemon_tasks.set()
def start_soon(self, async_fn, *args, name=None, daemon=False, main=False):
if main:
async_fn = self._wrap_main(async_fn)
if not daemon:
self._start_non_daemon(async_fn, *args)
else:
async_fn = self._wrap_daemon(async_fn)
self.nursery.start_soon(async_fn, *args)
def _wrap_main(self, async_fn):
async def wrapped(*a, **kw):
result = await async_fn(*a, **kw)
# When a main process ends, cancel all.
self.nursery.cancel_scope.cancel()
return result
return wrapped
def _wrap_daemon(self, async_fn):
async def wrapped(*a, **kw):
with trio.open_cancel_scope() as scope:
try:
self._daemon_scopes.add(scope)
return await async_fn(*a, **kw)
finally:
self._daemon_scopes.remove(scope)
return wrapped
def _start_non_daemon(self, async_fn, *args):
token = object()
async def wrapped(*a, **kw):
try:
return await async_fn(*a, **kw)
finally:
self._non_daemon_tasks.remove(token)
if not self._non_daemon_tasks:
self._no_more_non_daemon_tasks.set()
self._non_daemon_tasks.add(token)
self._no_more_non_daemon_tasks.clear()
self.nursery.start_soon(wrapped, *args)
@asynccontextmanager
async def open_special_nursery():
async with trio.open_nursery() as nursery:
special_nursery = SpecialNursery(nursery)
yield special_nursery
# Now the with block is finished. We wait until
# all non-daemon tasks are done.
await special_nursery._no_more_non_daemon_tasks.wait()
for scope in special_nursery._daemon_scopes:
scope.cancel()
import pytest
import trio
from special_nursery import open_special_nursery
@pytest.mark.trio
async def test_daemon_tasks(autojump_clock):
tick_counter = 0
async def endless_loop():
nonlocal tick_counter
while True:
await trio.sleep(1)
tick_counter += 1
async with open_special_nursery() as nursery:
nursery.start_soon(endless_loop, daemon=True)
nursery.start_soon(trio.sleep, 5)
await trio.sleep(3)
assert tick_counter == 5
@pytest.mark.trio
async def test_main_tasks(autojump_clock):
tick_counter = 0
async def endless_loop():
nonlocal tick_counter
while True:
await trio.sleep(1)
tick_counter += 1
async with open_special_nursery() as nursery:
nursery.start_soon(endless_loop, daemon=True)
nursery.start_soon(trio.sleep, 5, main=True)
nursery.start_soon(trio.sleep, 10, main=True)
assert tick_counter == 5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment