-
-
Save wshayes/5115ff4aeb99a1e726edbebf0de2d2fd to your computer and use it in GitHub Desktop.
[FastAPI Websocket Example] Example of using websockets with FastAPI #fastapi #websockets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
from uuid import UUID | |
import aio_pika | |
import websockets.exceptions as ws_exc | |
from fastapi import APIRouter | |
from starlette.websockets import WebSocket | |
from conf.app import config | |
router = APIRouter() | |
task_log_exchange_name = 'task_log' | |
@router.websocket_route('/task/log/{guid}') | |
async def log_websocket(websocket: WebSocket): | |
guid = UUID(websocket.path_params['guid']) | |
async def on_message(message: aio_pika.IncomingMessage): | |
body = json.loads(message.body) | |
await websocket.send_json(json.loads(message.body)) | |
if body['type'] == 'state': | |
heartbeat.cancel() | |
async def consume_logs(): | |
connection = await aio_pika.connect_robust(url=config.celery_broker_url) | |
channel = await connection.channel() | |
await channel.set_qos(prefetch_count=1) | |
exchange = await channel.declare_exchange(name=task_log_exchange_name, type=aio_pika.ExchangeType.TOPIC) | |
queue = await channel.declare_queue(exclusive=True) | |
await queue.bind(exchange=exchange, routing_key=f'{guid}.#') | |
await queue.consume(callback=on_message, no_ack=True, exclusive=True) | |
async def send_heartbeat(): | |
while True: | |
await websocket.send_json({ | |
'class': 'task_log', | |
'type': 'heartbeat' | |
}) | |
await asyncio.sleep(config.TASK_LOG_HEARTBEAT) | |
loop = asyncio.get_running_loop() | |
try: | |
await websocket.accept() | |
heartbeat = loop.create_task(send_heartbeat()) | |
logs = loop.create_task(consume_logs()) | |
await asyncio.wait([heartbeat, logs]) | |
await websocket.close() | |
except ws_exc.ConnectionClosed: | |
print('>>>>>>> WEBSOCKET CLOSED') | |
print('>>>>>>> DONE') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
solved. the problem is that I use "time.sleep" inside a ssync function