Skip to content

Instantly share code, notes, and snippets.

@sorcio
Created July 31, 2018 17:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sorcio/bd0cf4b0cf3fa0c61dd6b0e4058f49e5 to your computer and use it in GitHub Desktop.
Save sorcio/bd0cf4b0cf3fa0c61dd6b0e4058f49e5 to your computer and use it in GitHub Desktop.
graceful service shutdown with Trio
from itertools import count
import signal
from async_generator import asynccontextmanager
import trio
class ShutdownHandler:
def __init__(self):
self.event = trio.Event()
async def monitor(self, task_status):
with trio.catch_signals({signal.SIGTERM}) as signal_aiter:
task_status.started()
async for batch in signal_aiter:
assert batch == {signal.SIGTERM}
self.event.set()
return
async def _cancel_on_event(self, cancel_scope, task_status):
task_status.started()
await self.event.wait()
cancel_scope.cancel()
@asynccontextmanager
async def cancel_on_shutdown(self):
async with trio.open_nursery() as nursery:
await nursery.start(self._cancel_on_event, nursery.cancel_scope)
yield nursery.cancel_scope
# just some example long-running code
class QueueClosed(Exception): pass
class ClosableQueue:
def __init__(self, cap):
self._queue = trio.Queue(cap)
self._closed = False
async def put(self, x):
await self._queue.put(x)
async def get(self):
if self._closed:
raise QueueClosed
x = await self._queue.get()
if x is QueueClosed:
raise QueueClosed
else:
return x
def close(self):
try:
while True:
self._queue.put_nowait(QueueClosed)
except trio.WouldBlock:
pass
self._closed = True
async def uncancellable_sleep(seconds):
deadline = trio.current_time() + seconds
with trio.open_cancel_scope(deadline=deadline, shield=True):
await trio.sleep_forever()
async def producer(q, shutdown_handler):
counter = count(1)
async with shutdown_handler.cancel_on_shutdown():
while True:
await uncancellable_sleep(1)
await q.put(next(counter))
await q.put(next(counter))
await q.put(next(counter))
# wouldn't it be nice to be able to close the queue instead?
print('producer shutting down')
q.close()
async def consumer(q):
try:
while True:
item = await q.get()
await trio.sleep(3)
print(item)
except QueueClosed:
print('consumer shutting down')
async def long_running_task(shutdown_handler):
q = ClosableQueue(0)
async with trio.open_nursery() as nursery:
nursery.start_soon(producer, q, shutdown_handler)
nursery.start_soon(consumer, q)
nursery.start_soon(consumer, q)
nursery.start_soon(consumer, q)
nursery.start_soon(consumer, q)
nursery.start_soon(consumer, q)
nursery.start_soon(consumer, q)
async def main():
shutdown_handler = ShutdownHandler()
async with trio.open_nursery() as nursery:
await nursery.start(shutdown_handler.monitor)
nursery.start_soon(long_running_task, shutdown_handler)
trio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment