Skip to content

Instantly share code, notes, and snippets.

@w4
Last active March 10, 2022 02:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save w4/c6a595bb157c797a0dfaa053cedf4e11 to your computer and use it in GitHub Desktop.
Save w4/c6a595bb157c797a0dfaa053cedf4e11 to your computer and use it in GitHub Desktop.
Call LetsEncrypt via vault-acme for a list of domains to generate SSL certificates and create a HAProxy crtlist (and NixOS container to run it)
import requests
import json
import logging
import os
separator = '-------------------------------------------------------'
logging.basicConfig(format='%(levelname_brackets)s %(message)s', level=logging.INFO)
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.levelname_brackets = "[{}{}\033[0m]".format(logging_colour(record.levelname), record.levelname.ljust(7))
return record
logging.setLogRecordFactory(record_factory)
logger = logging.getLogger(__name__)
domains = ["example.com"]
root = "/var/lib/acme"
token = "s.ABCDEF12345"
def get_leases():
try:
with open('{}/leases'.format(root)) as file:
return json.load(file)
except Exception as e:
logger.info("Leases file does not yet exist")
return {}
def main():
status = 0
leases = get_leases()
for domain in domains:
logger.info("Generating keys for {}".format(domain))
if domain in leases:
lease = requests.post(
"http://vault.home/v1/sys/leases/lookup",
data = json.dumps({ 'lease_id': leases[domain] }),
headers = { 'X-Vault-Token': token }
)
lease = lease.json()
if 'errors' in lease:
logger.warning('Error looking up lease:')
for error in lease['errors']:
for line in error.strip().split('\n'):
logger.warning(" - {}".format(line.strip()))
if 'data' in lease and lease['data']['ttl'] > 86400:
logger.info("↳ Skipped, as the certificate doesn't expire for {} seconds".format(lease['data']['ttl']))
continue
r = requests.put(
'http://vault.home/v1/acme/certs/{}'.format(domain),
data = json.dumps({
'common_name': domain,
'alternative_names': '*.{}'.format(domain)
}),
headers = { 'X-Vault-Token': token }
)
r = r.json()
if 'errors' in r:
logger.error("↳ Failed to generate keys:")
for error in r['errors']:
for line in error.strip().split('\n'):
logger.error(" - {}".format(line.strip()))
status = 1
logger.info(separator)
continue
if 'data' not in r or 'cert' not in r['data'] or 'private_key' not in r['data']:
logger.error(" - Certificates missing: {}".format(r))
logger.info(separator)
status = 1
continue
logger.info("↳ Writing to {}.pem".format(domain))
with open('{}/{}.pem'.format(root, domain), 'w') as file:
file.write(r['data']['cert'])
file.write(r['data']['private_key'])
os.chmod('{}/{}.pem'.format(root, domain), 0o640)
print(r)
exit(1)
leases[domain] = r['lease_id']
logger.info(separator)
logger.info("Writing {}/crt-list.txt".format(root))
with open('{}/crt-list.txt'.format(root), 'w') as file:
for domain in domains:
file.write('{}/{}.pem'.format(root, domain))
file.write('\n')
logger.info("Writing {}/leases".format(root))
with open('{}/leases'.format(root), 'w') as file:
json.dump(leases, file)
exit(status)
def logging_colour(level):
return {
"DEBUG": "\033[0;34m",
"INFO": "\033[0;32m",
"WARNING": "\033[0;33m",
"ERROR": "\033[0;31m"
}[level]
if __name__ == '__main__':
main()
{ config, pkgs, lib, ... }:
{
containers.certbot = {
autoStart = true;
ephemeral = true;
bindMounts = {
"/var/lib/acme" = {
hostPath = "/data/acme";
isReadOnly = false;
};
};
macvlans = [ "vlan101" ];
config = { config, pkgs, ... }:
let
numCerts = 1;
_24hSecs = 60 * 60 * 24;
in {
networking = {
interfaces.mv-vlan101.ipv4.addresses = [ { address = "10.0.64.102"; prefixLength = 24; } ];
defaultGateway = "10.0.64.2";
nameservers = [ "10.0.64.1" ];
};
users.users.acme = {
home = "/var/lib/acme";
group = "acme";
isSystemUser = true;
};
users.groups.acme = {};
systemd.timers."vault-crtlist-generator" = {
description = "Renew ACME certificates from Vault";
wantedBy = [ "timers.target" ];
timerConfig = {
OnCalendar = "daily";
Unit = "vault-crtlist-generator.service";
Persistent = "yes";
AccuracySec = "${toString (_24hSecs / numCerts)}s";
RandomizedDelaySec = "24h";
};
};
systemd.services."vault-crtlist-generator" = {
description = "Renew ACME certificates from Vault";
after = [ "network.target" "network-online.target" "nss-lookup.target" ];
wants = [ "network-online.target" ];
wantedBy = [ "default.target" ];
serviceConfig = {
Type = "oneshot";
User = "acme";
Group = "acme";
UMask = 0022;
StateDirectoryMode = 750;
ProtectSystem = "strict";
ReadWritePaths = [
"/var/lib/acme"
];
PrivateTmp = true;
WorkingDirectory = "/tmp";
ExecStart = let
python = pkgs.python3.withPackages (ps: with ps; [ requests ]);
in
"${python.interpreter} ${./vault-crtlist-generator.py}";
};
};
};
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment