Created
May 30, 2022 20:34
-
-
Save Mgancita/ad8f3c3648573f62e583d9f6c44bcf58 to your computer and use it in GitHub Desktop.
FastAPI Async Websocket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Services to work with WebSockets.""" | |
import asyncio | |
from datetime import datetime | |
import aioredis | |
from fastapi import WebSocket, WebSocketDisconnect | |
REDIS_PASSWORD = [SET PASSWORD] | |
REDIS_URL = [SET URL] | |
async def redis_connector(websocket: WebSocket, chat_id: str, user_id: str): # noqa: C901 | |
"""Orchestrate connecting to redis, pubsub, and websockets.""" | |
redis_chat_name = f"room:{chat_id}" | |
async def consumer_handler(ws: WebSocket, r): | |
"""Handle the consuming of messages from the browser and publishing them to redis.""" | |
await r.publish(redis_chat_name, f"{user_id} joined the chat!") | |
try: | |
while True: | |
message = await ws.receive_text() | |
if message: | |
await r.publish(redis_chat_name, f"{user_id}: {message}") | |
except WebSocketDisconnect: | |
# TODO this needs better handling | |
await r.unsubscribe(redis_chat_name) | |
await r.publish(redis_chat_name, f"{user_id} left the chat!") | |
async def producer_handler(r: aioredis.Redis, ws: WebSocket): | |
"""Handle subscribing to a redis topic and sending messages to the browser.""" | |
(channel,) = await r.subscribe(redis_chat_name) | |
assert isinstance(channel, aioredis.Channel) | |
try: | |
while True: | |
message = await channel.get() | |
if message: | |
await ws.send_json( | |
{ | |
"message": message.decode("utf-8"), | |
} | |
) | |
except Exception: | |
# TODO this needs handling better | |
pass | |
redis: aioredis.Redis = await aioredis.create_redis_pool( | |
REDIS_URL, db=0, password=REDIS_PASSWORD | |
) | |
consumer_task = consumer_handler(websocket, redis) | |
producer_task = producer_handler(redis, websocket) | |
await asyncio.gather(producer_task, consumer_task) | |
redis.close() | |
await redis.wait_closed() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment