Skip to content

Instantly share code, notes, and snippets.

@codemation
Created July 22, 2020 13:40
Show Gist options
  • Save codemation/b05e36a916404df0b26c41beb1a2e051 to your computer and use it in GitHub Desktop.
Save codemation/b05e36a916404df0b26c41beb1a2e051 to your computer and use it in GitHub Desktop.
A generic implementation for post-response hooks", using websocket & async generators with fastapi
# The following assumes run(sever) was called with an existing fastapi router as input var
def run(server):
import asyncio, uuid
from fastapi.websockets import WebSocket
from fastapi.testclient import TestClient
server.clients = {}
async def get_client_by_id(client_id: str, path: str):
"""
creates an async generator for holding the context
of a TestClient.websocket_connect open
"""
async def client():
log.warning(f"started client with id: {client_id} and path {path}")
c = TestClient(server)
with c.websocket_connect(f"{path}") as websocket:
while True:
result = yield websocket
if result == 'finished':
log.warning(f"cleaning up client with id: {client_id} and path {path}")
break
# returns open connetion, if exists
if not client_id in server.clients:
# creates & assigns generator to server.clients
server.clients[client_id] = client()
# initializes generator with .asend(None) & returns open websocket
return await server.clients[client_id].asend(None)
return await server.clients[client_id].asend(client_id)
async def cleanup_client(client_id):
"""
cleanup open websocket connection
"""
if client_id in server.clients:
try:
await sever.clients[client_id].asend('finished')
except Exception:
print(f"I cleaned up client with {client_id}")
async def post_response_work(client_id, duration):
print(f"I started work with {client_id} which will take {duration} seconds")
await asyncio.sleep(duration)
print(f"I finished work with {client_id} after {duration} seconds")
await cleanup_client(client_id)
@server.websocket_route("/post_response")
async def attach_databases(websocket: WebSocket):
await websocket.accept()
# wait for work
work = await websocket.receive_json()
# send receipt of work received
await websocket.send_json({"message": f"I started a post_response with {work['work']}"})
# start work with will continue after client response
await post_response_work(work['client'], work['work'])
await websocket.close()
async def trigger_post_response(work):
client = TestClient(server)
import time
start = time.time()
client_id = str(uuid.uuid1())
# pull open websocket client conection
websocket = await get_client_by_id(client_id, "/post_response")
# send work to websocket server
websocket.send_json({"work": work, 'client': client_id})
# waits for ack of work received
result = websocket.receive_json()
print(f"trigger_post_response: {result} after {time.time()-start} seconds")
return result
@server.get("/work/{work}")
async def start_work(work: int):
return await trigger_post_response(work)
"""
$ uvicorn server:app
INFO: Started server process [18160]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
WARNING: started client with id: a11a7248-cc1c-11ea-9ada-f3f7bc2ffe6c and path /post_response
I started work with a11a7248-cc1c-11ea-9ada-f3f7bc2ffe6c which will take 5 seconds
trigger_post_response: {'message': 'I started a post_response with 5'} after 0.008193492889404297 seconds
INFO: 127.0.0.1:45868 - "GET /work/5 HTTP/1.1" 200 OK
I finished work with a11a7248-cc1c-11ea-9ada-f3f7bc2ffe6c after 5 seconds
I cleaned up client with a11a7248-cc1c-11ea-9ada-f3f7bc2ffe6c
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment