Skip to content

Instantly share code, notes, and snippets.

@naufalafif
Created February 25, 2022 05:31
Show Gist options
  • Save naufalafif/2d9944434a0fce183a1adab538ae124a to your computer and use it in GitHub Desktop.
Save naufalafif/2d9944434a0fce183a1adab538ae124a to your computer and use it in GitHub Desktop.
Fastapi WebsocketEndpoint
from fastapi import APIRouter, WebSocketDisconnect, WebSocket
router = APIRouter()
@router.websocket("/")
async def lab_socket(websocket: WebSocket):
async def on_connect():
# websocket connect event
await websocket.accept()
async def on_receive(data):
# websocket receive event
await websocket.send_text(f"Message text was: {data}")
async def on_disconnect():
# websocket diconnect event
pass
try:
await on_connect()
while True:
data = await websocket.receive_text()
await on_receive(data)
except WebSocketDisconnect:
await on_disconnect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment