Skip to content

Instantly share code, notes, and snippets.

@codemation
Last active July 29, 2020 06:41
Show Gist options
  • Save codemation/755e4b8b32a47ef326a2a9bf50c35782 to your computer and use it in GitHub Desktop.
Save codemation/755e4b8b32a47ef326a2a9bf50c35782 to your computer and use it in GitHub Desktop.
Task scheduling design pattern for Fastapi web framework, can be used for internal worker / consumer or as a method of sending post_request response hooks
import asyncio, uuid
from fastapi.websockets import WebSocket
from fastapi.testclient import TestClient
from fastapi import FastAPI
import logging as log
log.basicConfig()
server = FastAPI()
server.clients = {}
server.tasks = []
async def get_client_by_id(client_id: str, path: str):
"""
creates an async generator for holding the context
of a TestClient.websocket_connect open
"""
async def client():
log.warning(f"started client with id: {client_id} and path {path}")
c = TestClient(server)
with c.websocket_connect(f"{path}") as websocket:
while True:
result = yield websocket
if result == 'finished':
log.warning(f"cleaning up client with id: {client_id} and path {path}")
break
# returns open connetion, if exists
if not client_id in server.clients:
# creates & assigns generator to server.clients
server.clients[client_id] = client()
# initializes generator with .asend(None) & returns open websocket
return await server.clients[client_id].asend(None)
return await server.clients[client_id].asend(client_id)
async def worker(client, interval):
"""
waits for new work in queue & sleeps
"""
while True:
if len(server.tasks) == 0:
log.warning(f"worker {client} found no tasks to run, sleeping {interval}")
await asyncio.sleep(interval)
continue
job = server.tasks.pop(0)
if job == 'finished':
return
result = await job
log.warning(f"worker finished job: {result}")
@server.websocket_route("/worker")
async def run_worker(websocket: WebSocket):
await websocket.accept()
config = await websocket.receive_json()
client, interval = config['client'], config['interval']
await websocket.send_json({"message": f"started worker {client} with config {config}"})
try:
await worker(client, interval)
except Exception as e:
print(repr(e))
finally:
log.warning(f"worker {client} exiting")
await cleanup_client(config['client'])
await websocket.close()
async def work(job_id, duration):
print(f"starting {job_id} for {duration} seconds")
await asyncio.sleep(duration) # Fake Work
print(f"finished {job_id} for {duration} seconds")
return {"message": f"{job_id} completed"}
@server.get("/work/add")
async def add_work():
server.tasks.append(
work(str(uuid.uuid1()), 10) #
)
return {"message": "added work"}
@server.get("/worker/{interval}")
async def create_worker(interval: int):
return await add_worker(interval)
async def add_worker(interval: int):
import time
start = time.time()
client_id = str(uuid.uuid1())
# pull open websocket client conection
websocket = await get_client_by_id(client_id, "/worker")
try:
# send work to websocket server
websocket.send_json({"interval": interval, 'client': client_id})
# waits for ack of work received
result = websocket.receive_json()
log.warning(f"started worker: {result} after {time.time()-start} seconds")
except Exception:
await cleanup_client(client_id)
return result
async def cleanup_client(client_id):
"""
cleanup open websocket connection
"""
if client_id in server.clients:
try:
await sever.clients[client_id].asend('finished')
except Exception:
print(f"I cleaned up client with {client_id}")
@server.on_event("startup")
async def start_workers():
"""
creates default running workers on app start
"""
for _ in range(2):
await add_worker(10)
@server.on_event("shutdown")
async def close_sessions():
"""
on app shutdown, closes workers & exits open websocket context
"""
for client in server.clients:
server.tasks.append('finished')
await cleanup_client(client)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment