Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
HAProxy stats CSV to JSON
#!/usr/bin/env python3
from csv import DictReader
from sanic import Sanic
from sanic.response import json
import aiohttp
import uvloop
# Create an event loop manually so that we can use it for both sanic & aiohttp
loop = uvloop.new_event_loop()
app = Sanic(__name__)
def parse(csvfile):
reader = DictReader(csvfile.lstrip("# ").split('\n'), delimiter=',')
services = dict(frontend=[], backend=[])
listeners = []
# frontend and backend services
for row in reader:
svname = row["svname"].lower()
if svname == "frontend" or svname == "backend":
services[svname].append(row)
else:
listeners.append(row)
# add listener names to corresponding backends
for listener in listeners:
for backend in services["backend"]:
if "listeners" not in backend:
backend["listeners"] = []
if backend["iid"] == listener["iid"]:
backend["listeners"].append(listener)
return services
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
@app.route("/api/v1/stats")
async def stats_handler(req):
async with aiohttp.ClientSession(loop=loop) as session:
response = await fetch(session, "{url}/haproxy_stats?stats;csv;norefresh".format(url="http://10.64.10.184:9000"))
response = parse(response)
if req.query_string:
svname = "frontend" if "frontend" in req.query_string else "backend"
pxname = req.args.get(svname, "none").split('_')[0]
# filter services by pxname
predicate = lambda x: x["pxname"].split('_')[0] == pxname
response = {svname: [x for x in response.get(svname, []) if predicate(x)]}
return json(response)
if __name__ == "__main__":
app.run(port=8000, loop=loop)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment