Skip to content

Instantly share code, notes, and snippets.

@mr-rodgers
Last active January 19, 2019 17:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mr-rodgers/f56353e9c44cee25f3d5926b87e296ff to your computer and use it in GitHub Desktop.
Save mr-rodgers/f56353e9c44cee25f3d5926b87e296ff to your computer and use it in GitHub Desktop.
from io import StringIO
from os import environ, path
import aiohttp
import aiohttp.web
import asyncio
import ssl
import json
import logging
import textwrap
HOSTS_FILE = environ.get("HOSTS_FILE_PATH", "hosts")
CLUSTER_URL = "https://kubernetes.default.svc"
CLUSTER_CA = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
HOSTS_PREFIX = ""
HEALTHZ_HOST = environ.get("HEALTHZ_HOST", "0.0.0.0")
HEALTHZ_PORT = environ.get("HEALTHZ_PORT", "9090")
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %I:%M:%S%p',
level=logging.DEBUG if environ.get('DEBUG', '') else logging.INFO
)
with open("/var/run/secrets/kubernetes.io/serviceaccount/token", "r") as fh:
CLUSTER_TOKEN = fh.read().strip()
async def healthz(request):
stat = request.app["stat"]
response_data = {}
status = 200
if "ingress-watch-resp" in stat:
response = stat["ingress-watch-resp"]
response_data["ingress-watch-alive"] = not response.connection.closed
if response.connection.closed:
status = 500
else:
status = 200
response_data["ingress-watch-alive"] = "UNKNOWN"
return aiohttp.web.json_response(response_data, status=status)
async def watch_ingresses(client, stat):
url = "{0}/apis/extensions/v1beta1/ingresses?watch=1".format(CLUSTER_URL)
logging.info("Watching cluster for ingress changes")
ingresses = {}
async with client.get(url, timeout=None) as resp:
resp.raise_for_status()
stat["ingress-watch-resp"] = resp
buffer = b""
errored_lines = []
async for chunk, eod in resp.content.iter_chunks():
buffer += chunk
if eod:
lines = buffer.decode("utf-8").strip().splitlines(False)
for line in lines:
try:
if errored_lines:
logging.debug(
"Trying with {0} previous errored line(s)".format(
len(errored_lines)
)
)
text = "".join(errored_lines) + line
try:
event = json.loads(text)
except:
logging.debug(
"That didn't work. Trying line without previous lines."
)
event = json.loads(line)
errored_lines = []
else:
event = json.loads(line)
errored_lines = []
except:
errored_lines.append(line)
logging.error("Failed to parse JSON line:\n\n%s\n\n", line)
continue
else:
logging.debug(
"New event received:\n%s",
textwrap.indent(
json.dumps(event, indent=2, sort_keys=True),
' ' * 2
)
)
logging.info(
"event: %s %s %s/%s",
event["type"],
event["object"]["kind"],
event["object"]["metadata"]["namespace"],
event["object"]["metadata"]["name"]
)
if event["type"] == "ADDED" or event["type"] == "MODIFIED":
ingresses[event["object"]["metadata"]["uid"]] = event["object"]
elif event["type"] == "DELETED":
del ingresses[event["object"]["metadata"]["uid"]]
yield ingresses
buffer = b""
def build_hosts_file(objects):
ingresses = list(filter(is_ingress, objects.values()))
logging.debug(
"Preparing to write hosts file with records for {} ingress(es)".format(
len(ingresses)
)
)
hostsf = StringIO("")
hostsf.write(HOSTS_PREFIX)
ip_hostnames = {}
for ingress in ingresses:
ips = [ing["ip"] for ing in ingress["status"].get(
"loadBalancer", {}
).get("ingress", [])]
for ip in ips:
rules = ingress["spec"].get("rules", [])
if rules:
ip_hostnames.setdefault(ip, []).extend(
[rule["host"] for rule in rules]
)
for ip, hostnames in ip_hostnames.items():
hostsf.write("{0}\t{1}\n".format(ip, " ".join(hostnames)))
logging.debug("New event host file generated:\n%s", textwrap.indent(
hostsf.getvalue(),
' ' * 2
))
with open(HOSTS_FILE, "w", encoding="ascii") as f:
f.write(hostsf.getvalue())
logging.info("Wrote new hosts file to %r", HOSTS_FILE)
def is_ingress(obj):
return obj["kind"] == "Ingress"
async def main():
headers = {"Authorization": "Bearer {0}".format(CLUSTER_TOKEN)}
ssl_context = ssl.create_default_context()
if path.isfile(CLUSTER_CA):
ssl_context.load_verify_locations(CLUSTER_CA)
connector = aiohttp.TCPConnector(ssl=ssl_context)
stat = {} # so that co-routines can surface information back to the health checker
# Create the health check application
app = aiohttp.web.Application()
app["stat"] = stat
app.add_routes([aiohttp.web.get("/healthz", healthz)])
app_runner = aiohttp.web.AppRunner(app)
await app_runner.setup()
socket_listener = aiohttp.web.TCPSite(app_runner, HEALTHZ_HOST, int(HEALTHZ_PORT))
await socket_listener.start()
async with aiohttp.ClientSession(connector=connector, headers=headers) as client:
async for ingresses in watch_ingresses(client, stat):
build_hosts_file(ingresses)
await app_runner.cleanup()
if __name__ == '__main__':
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment