Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
@app.get("/sse/stream")
async def stream(req: Request, channel: str = "default", redis: Redis = Depends(depends_redis)):
# return EventSourceResponse(subscribe(channel, redis))
async def event_publisher():
_log.info(f"Subscribing for client {req.client}")
(channel_subscription,) = await redis.subscribe(channel=Channel(channel, False))
try:
while True:
disconnected = await req.is_disconnected()
if disconnected:
_log.info(f"Disconnecting client {req.client}")
_log.info(f"Unsubscribing for client {req.client}")
(channel_unsubscription,) = await redis.unsubscribe(channel=Channel(channel, False))
break
while await channel_subscription.wait_message():
yield {"event": "message", "data": await channel_subscription.get(encoding='utf-8')}
_log.info(f"Disconnected from client {req.client}")
_log.info(f"Unsubscribing for client {req.client}")
(channel_unsubscription,) = await redis.unsubscribe(channel=Channel(channel, False))
except asyncio.CancelledError as e:
_log.info(f"Disconnected from client (via refresh/close) {req.client}")
# Do any other cleanup, if any
_log.info(f"Unsubscribing for client {req.client}")
(channel_unsubscription,) = await redis.unsubscribe(channel=Channel(channel, False))
raise e
return EventSourceResponse(event_publisher())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment