Skip to content

Instantly share code, notes, and snippets.

@smurfix
Forked from sorcio/trio_graceful_shutdown.py
Last active July 31, 2018 18:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save smurfix/582a8ae02c6e92f82b496c047c335010 to your computer and use it in GitHub Desktop.
Save smurfix/582a8ae02c6e92f82b496c047c335010 to your computer and use it in GitHub Desktop.
graceful service shutdown with Trio
from itertools import count
import signal
from async_generator import asynccontextmanager
import trio
async def monitor(self, task_status):
with trio.catch_signals({signal.SIGTERM}) as signal_aiter:
async with trio.open_nursery() as nursery:
task_status.started(nursery)
async for batch in signal_aiter:
assert batch == {signal.SIGTERM}
nursery.cancel_scope.cancel()
return
# just some example long-running code
class QueueClosed(Exception): pass
class ClosableQueue:
def __init__(self, cap):
self._queue = trio.Queue(cap)
self._closed = False
async def put(self, x):
await self._queue.put(x)
async def get(self):
if self._closed:
raise QueueClosed
x = await self._queue.get()
if x is QueueClosed:
raise QueueClosed
else:
return x
def close(self):
try:
while True:
self._queue.put_nowait(QueueClosed)
except trio.WouldBlock:
pass
self._closed = True
async def uncancellable_sleep(seconds):
deadline = trio.current_time() + seconds
with trio.open_cancel_scope(deadline=deadline, shield=True):
await trio.sleep_forever()
async def producer(q):
counter = count(1)
while True:
await uncancellable_sleep(1)
await q.put(next(counter))
await q.put(next(counter))
await q.put(next(counter))
# wouldn't it be nice to be able to close the queue instead?
print('producer shutting down')
q.close()
async def consumer(q):
try:
while True:
item = await q.get()
await trio.sleep(3)
print(item)
except QueueClosed:
print('consumer shutting down')
async def long_running_task(killable_nursery):
q = ClosableQueue(0)
async with trio.open_nursery() as nursery:
killable_nursery.start_soon(producer, q)
nursery.start_soon(consumer, q)
nursery.start_soon(consumer, q)
nursery.start_soon(consumer, q)
nursery.start_soon(consumer, q)
nursery.start_soon(consumer, q)
nursery.start_soon(consumer, q)
async def main():
async with trio.open_nursery() as nursery:
killable = await nursery.start(monitor)
nursery.start_soon(long_running_task, killable)
trio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment