Last active
January 3, 2024 14:51
-
-
Save herberthamaral/f1ec6e2faef904ed2d244a28faa55808 to your computer and use it in GitHub Desktop.
Listen to a nats topic and write to a HTTP client listening via server-sent-events
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from typing import AsyncGenerator, Dict | |
from fastapi import FastAPI | |
from nats.aio.client import Client as NATS | |
from nats.aio.client import Msg | |
from starlette.requests import Request | |
from starlette.responses import StreamingResponse | |
app = FastAPI() | |
nats = NATS() | |
async def get_nats() -> NATS: | |
global nats | |
if not nats.is_connected: | |
await nats.connect('localhost:4222') | |
return nats | |
async def subscription_to_generator(nats: NATS, topic: str) -> AsyncGenerator[Msg, Msg]: | |
queue: asyncio.Queue[Msg] = asyncio.Queue() | |
async def subscription_callback(msg: Msg) -> None: | |
await queue.put(msg) | |
sid = await nats.subscribe(topic, cb=subscription_callback) | |
while True: | |
try: | |
item = await asyncio.wait_for(queue.get(), timeout=60) | |
queue.task_done() | |
yield item | |
except asyncio.TimeoutError: | |
await nats.unsubscribe(sid) | |
yield None | |
async def event_stream(request: Request, user_id: int) -> AsyncGenerator[str, str]: | |
messages = subscription_to_generator(await get_nats(), f'user.{user_id}') | |
async for msg in messages: | |
if msg is None or await request.is_disconnected(): | |
return | |
msg_data = msg.data.decode() | |
data_str = f'data: {msg_data}\n\n' | |
yield data_str | |
@app.get('/') | |
async def root() -> Dict[str, str]: | |
return {'message': 'Hello world'} | |
@app.get('/stream/{user_id}') | |
async def stream(user_id: int, request: Request) -> StreamingResponse: | |
return StreamingResponse(event_stream(request, user_id), media_type='text/event-stream') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment