Created
January 31, 2022 14:21
-
-
Save jupiterbjy/000c3c62a372c92cdf0a3925daf899c4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Nursery cancellation demo of websocket | |
""" | |
import itertools | |
import trio | |
import fastapi | |
import hypercorn | |
from hypercorn.trio import serve | |
NURSERY = trio.open_nursery() | |
GLOBAL_NURSERY_STORAGE = {} | |
TIMEOUT = 5 | |
router = fastapi.APIRouter() | |
@router.websocket('/stream') | |
async def run_task(websocket: fastapi.WebSocket): | |
# accept and receive UUID | |
# Replace UUID with anything client-specific | |
await websocket.accept() | |
uuid_ = await websocket.receive_text() | |
print(f"[{uuid_}] CONNECTED") | |
# check if nursery exist in session, if exists, cancel it and wait it to end. | |
cancel_scope: trio.CancelScope | |
event: trio.Event | |
try: | |
cancel_scope, event = GLOBAL_NURSERY_STORAGE[uuid_] | |
except KeyError: | |
pass | |
else: | |
print(f"[{uuid_}] STOPPING NURSERY") | |
cancel_scope.cancel() | |
await event.wait() | |
# create new event, and start new nursery. | |
cancel_done_event = trio.Event() | |
async with trio.open_nursery() as nursery: | |
# save ref | |
GLOBAL_NURSERY_STORAGE[uuid_] = nursery.cancel_scope, cancel_done_event | |
try: | |
for n in itertools.count(0, 1): | |
nursery.start_soon(task, n, uuid_, websocket) | |
await trio.sleep(1) | |
# wait for client response | |
with trio.fail_after(TIMEOUT): | |
recv = await websocket.receive_text() | |
print(f"[{uuid_}] RECEIVED {recv}") | |
except trio.TooSlowError: | |
# client possibly left without proper disconnection. | |
print(f"[{uuid_}] CLIENT TIMEOUT") | |
except fastapi.websockets.WebSocketDisconnect: | |
print(f"[{uuid_}] CLIENT DISCONNECTED") | |
# fire event, and pop reference if any. | |
print(f"[{uuid_}] NURSERY STOPPED & REFERENCE DROPPED") | |
cancel_done_event.set() | |
GLOBAL_NURSERY_STORAGE.pop(uuid_, None) | |
async def task(text, uuid_, websocket: fastapi.WebSocket): | |
await websocket.send_text(str(text)) | |
print(f"[{uuid_}] SENT {text}") | |
if __name__ == '__main__': | |
cornfig = hypercorn.Config() | |
trio.run(serve, router, cornfig) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment