Skip to content

Instantly share code, notes, and snippets.

@djlambert
Last active May 30, 2019 18:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save djlambert/872716049a6789ebe567557dbe1bcfbc to your computer and use it in GitHub Desktop.
Save djlambert/872716049a6789ebe567557dbe1bcfbc to your computer and use it in GitHub Desktop.
import asyncio
import json
import logging
from uuid import UUID
import aio_pika
import websockets.exceptions as ws_exc
from fastapi import APIRouter, Path
from starlette import status
from starlette.websockets import WebSocket
from conf.app import config
logger = logging.getLogger(__name__)
router = APIRouter()
@router.websocket('/task/{guid}/log', name='task_log')
async def log_websocket(websocket: WebSocket, guid: UUID = Path(...)):
async def on_message(message: aio_pika.IncomingMessage):
body = json.loads(message.body)
await websocket.send_json(json.loads(message.body))
if body['type'] == 'state':
raise asyncio.CancelledError()
async def consume_logs():
connection = await aio_pika.connect_robust(url=config.celery_broker_url)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
exchange = await channel.declare_exchange(name=config.TASK_LOG_EXCHANGE, type=aio_pika.ExchangeType.TOPIC)
queue = await channel.declare_queue(exclusive=True)
await queue.bind(exchange=exchange, routing_key=f'{guid}.#')
await queue.consume(callback=on_message, no_ack=True, exclusive=True)
async def send_heartbeat():
while True:
await websocket.send_json({
'class': 'task_log',
'type': 'heartbeat'
})
await asyncio.sleep(config.TASK_LOG_HEARTBEAT)
loop = asyncio.get_running_loop()
loop.set_debug(config.DEBUG_TASK_LOG_LOOP)
logger.debug(f"Creating task log WebSocket for task '{guid}'")
await websocket.accept()
heartbeat = loop.create_task(send_heartbeat())
logs = loop.create_task(consume_logs())
try:
await asyncio.gather(heartbeat, logs)
except ws_exc.ConnectionClosed as exc:
logger.debug(f"Task log WebSocket for task '{guid}' closed: {exc}")
except asyncio.CancelledError:
logger.debug(f"Task '{guid}' ended, closing WebSocket")
await websocket.close(code=status.WS_1000_NORMAL_CLOSURE)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment