Skip to content

Instantly share code, notes, and snippets.

@costa
Created June 22, 2024 13:16
Show Gist options
  • Save costa/d3844d1758214b65daa16f43980b31b2 to your computer and use it in GitHub Desktop.
Save costa/d3844d1758214b65daa16f43980b31b2 to your computer and use it in GitHub Desktop.
Shim component for NATS Streaming (XpoLog input)
# NOTE this is self-server comp conf; NATS is (core) "reality" comp's service
version: '2.1'
services:
stan-xpolog:
build: .
networks:
- default
- reality_default
- xpolog_default
environment:
- PORT=80
- NATS_URI=nats://nats:4222
- STAN_CLUSTER=reality
- STAN_CHAN=this
- XPOLOG_URI=http://xpolog:30303/logeye/api/logger.jsp
expose:
- 80
restart: on-failure
networks:
reality_default:
external: true
xpolog_default:
external: true
# NOTE 'stan-xpolog' (allocated by self server) volume is basically wasted
FROM python:3.9
WORKDIR /stan-xpolog
ADD requirements.txt .
RUN pip install -r requirements.txt
ADD . .
ENV PORT=80
# NOTE works with a single process only (because of global var)... good enough!
CMD fastapi run server.py --port $PORT --workers 1
import os
import asyncio
import requests
from typing import Annotated
from fastapi import Body, FastAPI, status, responses
import nats.aio.client
import stan.aio.client
COMP_NAME = 'stan-xpolog'
NATS_URI = os.environ['NATS_URI']
STAN_CLUSTER = os.environ['STAN_CLUSTER']
STAN_CHAN = os.environ['STAN_CHAN']
XPOLOG_URI = os.environ['XPOLOG_URI']
JSON_HEADERS = {'Content-Type': 'application/json'}
app = FastAPI()
app.xpolog_http_url = None
@app.post("/")
@app.put("/")
@app.patch("/")
async def set_token(token: Annotated[str, Body(embed=True)]):
app.xpolog_http_url = XPOLOG_URI + '?token=' + token
return responses.Response(status_code=status.HTTP_204_NO_CONTENT)
async def mq_reactor(loop):
diz_nats = nats.aio.client.Client()
await diz_nats.connect(NATS_URI, max_reconnect_attempts=-1, io_loop=loop)
diz_stan = stan.aio.client.Client()
await diz_stan.connect(STAN_CLUSTER, COMP_NAME, nats=diz_nats)
async def cb(msg):
try:
if app.xpolog_http_url:
response = requests.post(app.xpolog_http_url, headers=JSON_HEADERS, data=msg.data)
if response.status_code != 200:
print("ERROR", response.json(), "HTTPing", msg.data, "to", app.xpolog_http_url)
except Exception as e:
print("ERROR", e) # NOTE and soldier on...
await diz_stan.subscribe(STAN_CHAN, durable_name=COMP_NAME, cb=cb)
@app.on_event('startup')
async def startup():
loop = asyncio.get_running_loop()
await loop.create_task(mq_reactor(loop))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment