Skip to content

Instantly share code, notes, and snippets.

@cnelson
Created May 10, 2017 01:20
Show Gist options
  • Save cnelson/f6f46aee04e84e5b9cc2b0fef62dfdfa to your computer and use it in GitHub Desktop.
Save cnelson/f6f46aee04e84e5b9cc2b0fef62dfdfa to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import datetime
import json
import os
import subprocess
import yaml
import marshmallow as ma
import requests
from OpenSSL import crypto
class Config(ma.Schema):
"""Configuratin Options:
SLACK_URL: An incoming webhook url
SLACK_CHANNEL: The channel to send the report to
SLACK_USERNAME: The username to post as (optional, defaults to `cert-expire-check`)
SLACK_ICON_EMOJI: The emoji to use as an icon (option, defaults to `:python:`)
DAYS_WARN: If a cert expires in <= this number of days a warning will be raised
(optional, defaults to 30)
DAYS_ERROR: If a cert expires in <= this number of days an error will be raised
(optional, defaults to 7)
"""
slack_url = ma.fields.Str(load_from='SLACK_URL', required=True)
slack_channel = ma.fields.Str(load_from='SLACK_CHANNEL', required=True)
slack_username = ma.fields.Str(load_from='SLACK_USERNAME', missing='cert-expire-check')
slack_icon_emoji = ma.fields.Str(load_from='SLACK_USERNAME', missing=':python:')
days_warn = ma.fields.Int(load_from='DAYS_WARN', missing=30)
days_error = ma.fields.Int(load_from='DAYS_ERROR', missing=7)
def bosh_cli(*args, _bosh_cli="bosh-cli"):
"""Run a command with the bosh v2 cli
Args:
*args(str): The arguments to bash to bosh; '--json' will be prepended to this list
_bosh_cli: The path to the bosh v2 cli (optional)
Returns:
dict: The json output of the commaned parsed by json.loads()
Raises:
RuntimeError: There was a problem running the bosh command
ValueError: There was a problem parsing the bosh output
"""
returncode = 0
command = [_bosh_cli, '--json'] + list(args)
try:
output = subprocess.check_output(command, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
output = exc.output
returncode = exc.returncode
try:
response = json.loads(output.decode('utf-8'))
except json.decoder.JSONDecodeError as exc:
raise ValueError("Could not parse output from `{0}`: {1}; Expected JSON, got: {2}".format(
command,
exc,
output
))
if returncode > 0:
raise RuntimeError("Error when running {0}: {1}".format(
command,
"\n".join(response['Lines'])
))
return response
def get_bosh_deployments():
"""Returns a list of deployments on the bosh director
Yields:
str: The names of a deployment
Raises:
See bosh_cli
"""
response = bosh_cli('deployments')
for table in response['Tables']:
for row in table['Rows']:
yield row['name']
def get_bosh_manifest(deployment):
"""Returns the manifest for a given deployment
Args:
deployment(str): The name of the deployment
Returns:
dict: The manifest parsed by yaml.load()
Raises:
See bosh_cli
"""
response = bosh_cli('-d', deployment, 'manifest')
return yaml.load(response['Blocks'][0])
def dict_generator(indict, pre=None):
"""Flatten a dict into a list of properties
Based on http://stackoverflow.com/questions/12507206/python-recommended-way-to-walk-complex-dictionary-structures-imported-from-json
"""
pre = pre[:] if pre else []
if isinstance(indict, dict):
for key, value in indict.items():
if isinstance(value, dict):
for d in dict_generator(value, pre + [key]):
yield d
elif isinstance(value, list) or isinstance(value, tuple):
for v in value:
for d in dict_generator(v, pre + [key]):
yield d
else:
yield pre + [key, value]
else:
yield pre + [indict]
def find_certificates(manifest):
"""Return any PEM encoded certificates in a manifest
Args:
manifest(dict): A bosh manifest loaded by yaml.load()
Yields:
tuple: (path.to.property, certificate)
"""
for item in dict_generator(manifest):
value = item.pop()
key = ".".join(item)
if isinstance(value, str) and value.strip().startswith('-----BEGIN CERTIFICATE-----'):
cert = crypto.load_certificate(crypto.FILETYPE_PEM, value)
not_after = datetime.datetime.strptime(
cert.get_notAfter().decode('utf-8'),
'%Y%m%d%H%M%SZ'
)
yield (key, not_after)
def make_attachment(deployment, prop, expires, color):
"""Make a slack attachment for a cert warning/error
Args:
deployment(str): The name of the deployment containing `prop`
prop(str): The property containing the certificate
expires(int): The number of days until the cert expires (negative numbers if already expired)
color: The color to use for the slack attachment
Returns:
dict: The formatted slack attachment
"""
if expires < 0:
status = "Expired!"
elif expires == 0:
status = "Expires today!"
elif expires == 1:
status = "Expires tomorrow!"
elif expires > 1:
status = "Expires in {0} days.".format(expires)
attachment = {
"color": color,
"mrkdwn_in": ["text"],
"text": "*{deployment}* `{property}`\n{status}".format(
deployment=deployment,
property=prop,
status=status
)
}
return attachment
if __name__ == "__main__":
# load the config from the environment
config = Config(strict=True).load(os.environ).data
attachments = []
# find certs in all deployments on the director
for name in get_bosh_deployments():
for cert in find_certificates(get_bosh_manifest(name)):
expires = (cert[1]-datetime.datetime.utcnow()).days
# if it's a problem, stash it
if expires <= config['days_error']:
attachments.append(make_attachment(name, cert[0], expires, 'danger'))
elif expires <= config['days_warn']:
attachments.append(make_attachment(name, cert[0], expires, 'warning'))
# if we have something to say, say it
if attachments:
requests.post(
config['slack_url'],
json={
'username': config['slack_username'],
'channel': config['slack_channel'],
'icon_emoji': config['slack_icon_emoji'],
'text': 'Certificate report:',
'attachments': attachments
},
).raise_for_status()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment