Skip to content

Instantly share code, notes, and snippets.

@wshayes
Forked from djlambert/log.py
Last active June 29, 2023 06:50
Show Gist options
  • Save wshayes/5115ff4aeb99a1e726edbebf0de2d2fd to your computer and use it in GitHub Desktop.
Save wshayes/5115ff4aeb99a1e726edbebf0de2d2fd to your computer and use it in GitHub Desktop.
[FastAPI Websocket Example] Example of using websockets with FastAPI #fastapi #websockets
import asyncio
import json
from uuid import UUID
import aio_pika
import websockets.exceptions as ws_exc
from fastapi import APIRouter
from starlette.websockets import WebSocket
from conf.app import config
router = APIRouter()
task_log_exchange_name = 'task_log'
@router.websocket_route('/task/log/{guid}')
async def log_websocket(websocket: WebSocket):
guid = UUID(websocket.path_params['guid'])
async def on_message(message: aio_pika.IncomingMessage):
body = json.loads(message.body)
await websocket.send_json(json.loads(message.body))
if body['type'] == 'state':
heartbeat.cancel()
async def consume_logs():
connection = await aio_pika.connect_robust(url=config.celery_broker_url)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
exchange = await channel.declare_exchange(name=task_log_exchange_name, type=aio_pika.ExchangeType.TOPIC)
queue = await channel.declare_queue(exclusive=True)
await queue.bind(exchange=exchange, routing_key=f'{guid}.#')
await queue.consume(callback=on_message, no_ack=True, exclusive=True)
async def send_heartbeat():
while True:
await websocket.send_json({
'class': 'task_log',
'type': 'heartbeat'
})
await asyncio.sleep(config.TASK_LOG_HEARTBEAT)
loop = asyncio.get_running_loop()
try:
await websocket.accept()
heartbeat = loop.create_task(send_heartbeat())
logs = loop.create_task(consume_logs())
await asyncio.wait([heartbeat, logs])
await websocket.close()
except ws_exc.ConnectionClosed:
print('>>>>>>> WEBSOCKET CLOSED')
print('>>>>>>> DONE')
@doglex
Copy link

doglex commented Jun 29, 2023

solved. the problem is that I use "time.sleep" inside a ssync function

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment