Skip to content

Instantly share code, notes, and snippets.

@Mgancita
Created May 30, 2022 20:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Mgancita/ad8f3c3648573f62e583d9f6c44bcf58 to your computer and use it in GitHub Desktop.
Save Mgancita/ad8f3c3648573f62e583d9f6c44bcf58 to your computer and use it in GitHub Desktop.
FastAPI Async Websocket
"""Services to work with WebSockets."""
import asyncio
from datetime import datetime
import aioredis
from fastapi import WebSocket, WebSocketDisconnect
REDIS_PASSWORD = [SET PASSWORD]
REDIS_URL = [SET URL]
async def redis_connector(websocket: WebSocket, chat_id: str, user_id: str): # noqa: C901
"""Orchestrate connecting to redis, pubsub, and websockets."""
redis_chat_name = f"room:{chat_id}"
async def consumer_handler(ws: WebSocket, r):
"""Handle the consuming of messages from the browser and publishing them to redis."""
await r.publish(redis_chat_name, f"{user_id} joined the chat!")
try:
while True:
message = await ws.receive_text()
if message:
await r.publish(redis_chat_name, f"{user_id}: {message}")
except WebSocketDisconnect:
# TODO this needs better handling
await r.unsubscribe(redis_chat_name)
await r.publish(redis_chat_name, f"{user_id} left the chat!")
async def producer_handler(r: aioredis.Redis, ws: WebSocket):
"""Handle subscribing to a redis topic and sending messages to the browser."""
(channel,) = await r.subscribe(redis_chat_name)
assert isinstance(channel, aioredis.Channel)
try:
while True:
message = await channel.get()
if message:
await ws.send_json(
{
"message": message.decode("utf-8"),
}
)
except Exception:
# TODO this needs handling better
pass
redis: aioredis.Redis = await aioredis.create_redis_pool(
REDIS_URL, db=0, password=REDIS_PASSWORD
)
consumer_task = consumer_handler(websocket, redis)
producer_task = producer_handler(redis, websocket)
await asyncio.gather(producer_task, consumer_task)
redis.close()
await redis.wait_closed()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment