Skip to content

Instantly share code, notes, and snippets.

@ruslux
Last active September 1, 2023 05:21
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ruslux/d3fc3007b4431844fdff6a652ca896b1 to your computer and use it in GitHub Desktop.
Save ruslux/d3fc3007b4431844fdff6a652ca896b1 to your computer and use it in GitHub Desktop.
aioredis fastapi websocket pubsub
import uvicorn
from aioredis import create_pool
from fastapi import FastAPI
from starlette.websockets import WebSocket
app = FastAPI()
REDIS_URL = 'redis://redis:6379'
REDIS_DB = 0
REDIS_PASSWORD = None
POOL = None
async def get_pool():
global POOL
if not POOL:
POOL = await create_pool(REDIS_URL, db=REDIS_DB, password=REDIS_PASSWORD, minsize=5, maxsize=1000)
return POOL
async def handler(websocket: WebSocket):
await websocket.accept()
await websocket.send_json({"message": "Welcome to server!"})
user_id = websocket.path_params["user_id"]
pool = await get_pool()
with await pool as connection:
try:
await connection.execute_pubsub('subscribe', f'user:{user_id}')
print("Active users:", connection.pubsub_channels.keys())
channel = connection.pubsub_channels[f'user:{user_id}']
while await channel.wait_message():
message = await channel.get()
if message:
await websocket.send_bytes(message)
except Exception as e:
print("Closed in handler: ", e)
finally:
await connection.execute_pubsub('unsubscribe', f'user:{user_id}')
await websocket.close()
app.add_websocket_route('/ws/{user_id}/', handler)
uvicorn.run(app, host='0.0.0.0', port=8080)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment