Skip to content

Instantly share code, notes, and snippets.

@bllli
Created November 8, 2023 06:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bllli/3593cf8d328cd5e274119f8cf6c4aa80 to your computer and use it in GitHub Desktop.
Save bllli/3593cf8d328cd5e274119f8cf6c4aa80 to your computer and use it in GitHub Desktop.
Server-Sent Events
import asyncio
import datetime
import random
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
@app.get("/api/sse", summary="test Server-Sent Events")
async def sse_test():
async def event_generator(n):
for _ in range(n):
yield f"data: {_}: {datetime.datetime.now()}\n\n"
await asyncio.sleep(random.randint(2, 8) / 10)
return StreamingResponse(event_generator(5), media_type="text/event-stream")
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8199)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment