-
-
Save mr-rodgers/f56353e9c44cee25f3d5926b87e296ff to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from io import StringIO | |
from os import environ, path | |
import aiohttp | |
import aiohttp.web | |
import asyncio | |
import ssl | |
import json | |
import logging | |
import textwrap | |
HOSTS_FILE = environ.get("HOSTS_FILE_PATH", "hosts") | |
CLUSTER_URL = "https://kubernetes.default.svc" | |
CLUSTER_CA = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" | |
HOSTS_PREFIX = "" | |
HEALTHZ_HOST = environ.get("HEALTHZ_HOST", "0.0.0.0") | |
HEALTHZ_PORT = environ.get("HEALTHZ_PORT", "9090") | |
logging.basicConfig( | |
format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %I:%M:%S%p', | |
level=logging.DEBUG if environ.get('DEBUG', '') else logging.INFO | |
) | |
with open("/var/run/secrets/kubernetes.io/serviceaccount/token", "r") as fh: | |
CLUSTER_TOKEN = fh.read().strip() | |
async def healthz(request): | |
stat = request.app["stat"] | |
response_data = {} | |
status = 200 | |
if "ingress-watch-resp" in stat: | |
response = stat["ingress-watch-resp"] | |
response_data["ingress-watch-alive"] = not response.connection.closed | |
if response.connection.closed: | |
status = 500 | |
else: | |
status = 200 | |
response_data["ingress-watch-alive"] = "UNKNOWN" | |
return aiohttp.web.json_response(response_data, status=status) | |
async def watch_ingresses(client, stat): | |
url = "{0}/apis/extensions/v1beta1/ingresses?watch=1".format(CLUSTER_URL) | |
logging.info("Watching cluster for ingress changes") | |
ingresses = {} | |
async with client.get(url, timeout=None) as resp: | |
resp.raise_for_status() | |
stat["ingress-watch-resp"] = resp | |
buffer = b"" | |
errored_lines = [] | |
async for chunk, eod in resp.content.iter_chunks(): | |
buffer += chunk | |
if eod: | |
lines = buffer.decode("utf-8").strip().splitlines(False) | |
for line in lines: | |
try: | |
if errored_lines: | |
logging.debug( | |
"Trying with {0} previous errored line(s)".format( | |
len(errored_lines) | |
) | |
) | |
text = "".join(errored_lines) + line | |
try: | |
event = json.loads(text) | |
except: | |
logging.debug( | |
"That didn't work. Trying line without previous lines." | |
) | |
event = json.loads(line) | |
errored_lines = [] | |
else: | |
event = json.loads(line) | |
errored_lines = [] | |
except: | |
errored_lines.append(line) | |
logging.error("Failed to parse JSON line:\n\n%s\n\n", line) | |
continue | |
else: | |
logging.debug( | |
"New event received:\n%s", | |
textwrap.indent( | |
json.dumps(event, indent=2, sort_keys=True), | |
' ' * 2 | |
) | |
) | |
logging.info( | |
"event: %s %s %s/%s", | |
event["type"], | |
event["object"]["kind"], | |
event["object"]["metadata"]["namespace"], | |
event["object"]["metadata"]["name"] | |
) | |
if event["type"] == "ADDED" or event["type"] == "MODIFIED": | |
ingresses[event["object"]["metadata"]["uid"]] = event["object"] | |
elif event["type"] == "DELETED": | |
del ingresses[event["object"]["metadata"]["uid"]] | |
yield ingresses | |
buffer = b"" | |
def build_hosts_file(objects): | |
ingresses = list(filter(is_ingress, objects.values())) | |
logging.debug( | |
"Preparing to write hosts file with records for {} ingress(es)".format( | |
len(ingresses) | |
) | |
) | |
hostsf = StringIO("") | |
hostsf.write(HOSTS_PREFIX) | |
ip_hostnames = {} | |
for ingress in ingresses: | |
ips = [ing["ip"] for ing in ingress["status"].get( | |
"loadBalancer", {} | |
).get("ingress", [])] | |
for ip in ips: | |
rules = ingress["spec"].get("rules", []) | |
if rules: | |
ip_hostnames.setdefault(ip, []).extend( | |
[rule["host"] for rule in rules] | |
) | |
for ip, hostnames in ip_hostnames.items(): | |
hostsf.write("{0}\t{1}\n".format(ip, " ".join(hostnames))) | |
logging.debug("New event host file generated:\n%s", textwrap.indent( | |
hostsf.getvalue(), | |
' ' * 2 | |
)) | |
with open(HOSTS_FILE, "w", encoding="ascii") as f: | |
f.write(hostsf.getvalue()) | |
logging.info("Wrote new hosts file to %r", HOSTS_FILE) | |
def is_ingress(obj): | |
return obj["kind"] == "Ingress" | |
async def main(): | |
headers = {"Authorization": "Bearer {0}".format(CLUSTER_TOKEN)} | |
ssl_context = ssl.create_default_context() | |
if path.isfile(CLUSTER_CA): | |
ssl_context.load_verify_locations(CLUSTER_CA) | |
connector = aiohttp.TCPConnector(ssl=ssl_context) | |
stat = {} # so that co-routines can surface information back to the health checker | |
# Create the health check application | |
app = aiohttp.web.Application() | |
app["stat"] = stat | |
app.add_routes([aiohttp.web.get("/healthz", healthz)]) | |
app_runner = aiohttp.web.AppRunner(app) | |
await app_runner.setup() | |
socket_listener = aiohttp.web.TCPSite(app_runner, HEALTHZ_HOST, int(HEALTHZ_PORT)) | |
await socket_listener.start() | |
async with aiohttp.ClientSession(connector=connector, headers=headers) as client: | |
async for ingresses in watch_ingresses(client, stat): | |
build_hosts_file(ingresses) | |
await app_runner.cleanup() | |
if __name__ == '__main__': | |
try: | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment