Skip to content

Instantly share code, notes, and snippets.

@franzos
Last active June 5, 2025 16:02
Show Gist options
  • Save franzos/192aa0e59c7e48f4fa412fcb576515f0 to your computer and use it in GitHub Desktop.
Save franzos/192aa0e59c7e48f4fa412fcb576515f0 to your computer and use it in GitHub Desktop.
Manage SSL Certificates on Docker Host with HAProxy
#!/usr/bin/env python3
"""
Certificate Management Script for HAProxy
Shared on: https://gist.github.com/franzos/192aa0e59c7e48f4fa412fcb576515f0
Expected Directory Structure:
- Base directory (default: /srv/haproxy)
- ssl/ # Where HAProxy expects .pem certs (managed by this script)
- haproxy.cfg # HAProxy configuration file (domains are extracted from here)
- ignore-domains.txt # (Optional) List of domains to ignore, one per line
ignore-domains.txt:
- Place in the base directory (default: /srv/haproxy)
- Each line: a domain to ignore (e.g., test.example.com)
- Blank lines and lines starting with # are ignored
CLI Parameters:
--dry-run Show actions without making changes
--base-dir Set base directory (default: /srv/haproxy)
--debug Enable debug logging
Dependencies: (pip3 install) pyopenssl validators
"""
import argparse
import re
import os
import subprocess
import shutil
import logging
from datetime import datetime
import OpenSSL.crypto as crypto
import validators
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class CertManager:
def __init__(self, args, base_dir="/srv/haproxy"):
self.args = args
self.base_dir = base_dir
self.ssl_dir = os.path.join(base_dir, "ssl")
self.haproxy_cfg = os.path.join(base_dir, "haproxy.cfg")
self.ignore_file = os.path.join(base_dir, "ignore-domains.txt")
def ensure_ssl_dir(self):
"""Ensure SSL directory exists with correct permissions."""
if not self.args.dry_run:
os.makedirs(self.ssl_dir, mode=0o750, exist_ok=True)
logger.info("SSL directory ensured: %s", self.ssl_dir)
return self.ssl_dir
def get_ignored_domains(self):
"""Get list of domains to ignore from configuration file."""
if not os.path.exists(self.ignore_file):
return set()
try:
with open(self.ignore_file, "r", encoding="utf8") as f:
return {d.strip() for d in f.readlines() if d.strip()}
except IOError as e:
logger.warning("Could not read ignore file: %s", e)
return set()
def is_valid_domain(self, domain):
"""Validate domain format."""
return validators.domain(domain) and not domain.startswith(".")
def extract_domains(self):
"""Extract domains from HAProxy configuration."""
if not os.path.exists(self.haproxy_cfg):
raise FileNotFoundError(f"HAProxy config not found: {self.haproxy_cfg}")
domains = set()
ignored_domains = self.get_ignored_domains()
try:
with open(self.haproxy_cfg, "r", encoding="utf8") as f:
content = f.read()
# Find domains in SNI-based routing (req.ssl_sni)
sni_patterns = re.finditer(r"req\.ssl_sni\s+([^\s\}]+)", content)
for match in sni_patterns:
domain = match.group(1).strip("\" '")
domain = domain.split(":")[0]
if domain not in ignored_domains and self.is_valid_domain(domain):
domains.add(domain)
# Find domains in hdr(host) statements
hdr_patterns = re.finditer(r"hdr\(host\)\s*-i\s+([^\s]+)", content)
for match in hdr_patterns:
domain = match.group(1).strip("\" '")
domain = domain.split(":")[0] # Remove port if present
if domain not in ignored_domains and self.is_valid_domain(domain):
domains.add(domain)
# Find domains in bind statements
bind_patterns = re.finditer(
r"bind\s+[^#\n]*crt\s+[^#\n]*\/([^\/\s]+\.pem)", content
)
for match in bind_patterns:
domain = match.group(1).strip().replace(".pem", "")
if domain not in ignored_domains and self.is_valid_domain(domain):
domains.add(domain)
logger.info("Found domains: %s", sorted(domains))
if ignored_domains:
logger.info("Ignored domains: %s", sorted(ignored_domains))
return domains
except IOError as e:
raise RuntimeError(f"Failed to read HAProxy config: {e}")
def check_cert_expiry(self, cert_path, min_days=10):
"""Check if certificate is approaching expiration."""
try:
with open(cert_path, "rb") as f:
cert_data = f.read()
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_data)
expiry_date = datetime.strptime(
cert.get_notAfter().decode("ascii"), "%Y%m%d%H%M%SZ"
)
days_remaining = (expiry_date - datetime.now()).days
if days_remaining < min_days:
logger.warning("Certificate expires in %s days", days_remaining)
else:
logger.info("Certificate valid for %s days", days_remaining)
return days_remaining >= min_days
except Exception as e:
logger.error("Error checking certificate expiry: %s", e)
return False
def renew_cert(self, domain):
"""Attempt to renew a Let's Encrypt certificate."""
try:
# Try renewal with dry-run first
dry_run_cmd = [
"docker",
"exec",
"-i",
"letsencrypt",
"certbot",
"renew",
"--dry-run",
"--cert-name",
domain,
# TODO: Test
# "--no-random-sleep-on-renew"
]
result = subprocess.run(
dry_run_cmd, check=True, capture_output=True, text=True
)
if "The dry run was successful" in result.stdout:
logger.info("Dry-run renewal successful for %s", domain)
if not self.args.dry_run:
renew_cmd = [
"docker",
"exec",
"-i",
"letsencrypt",
"certbot",
"renew",
"--cert-name",
domain,
"--no-random-sleep-on-renew",
]
subprocess.run(renew_cmd, check=True)
logger.info("Certificate renewed for %s", domain)
return True
else:
logger.error("Dry-run renewal failed for %s", domain)
return False
except subprocess.CalledProcessError as e:
logger.error("Failed to renew certificate for %s: %s", domain, e)
return False
def try_get_letsencrypt_cert(self, domain):
"""Attempt to obtain a new Let's Encrypt certificate."""
logger.info("Attempting to get certificate for %s", domain)
try:
dry_run_cmd = [
"docker",
"exec",
"-i",
"letsencrypt",
"certbot",
"certonly",
"--standalone",
"--dry-run",
"-d",
domain,
]
result = subprocess.run(
dry_run_cmd, check=True, capture_output=True, text=True
)
if "The dry run was successful" in result.stdout:
if self.args.dry_run:
logger.info("[DRY-RUN] Would obtain new certificate for %s", domain)
return True
logger.info("Attempting actual certificate acquisition for %s", domain)
cert_cmd = [
"docker",
"exec",
"-i",
"letsencrypt",
"certbot",
"certonly",
"--standalone",
"-d",
domain,
]
subprocess.run(cert_cmd, check=True)
return True
return False
except subprocess.CalledProcessError as e:
logger.error(
"Failed to get Let's Encrypt certificate for %s: %s", domain, e
)
return False
def concat_letsencrypt_cert(self, domain):
"""Combine certificate and key files for HAProxy."""
live_dir = f"/srv/letsencrypt/data/live/{domain}"
haproxy_cert = os.path.join(self.ssl_dir, f"{domain}.pem")
fullchain = os.path.join(live_dir, "fullchain.pem")
privkey = os.path.join(live_dir, "privkey.pem")
if not (os.path.exists(fullchain) and os.path.exists(privkey)):
return False
try:
needs_renewal = not self.check_cert_expiry(fullchain)
if needs_renewal:
if self.args.dry_run:
logger.info("[DRY-RUN] Would renew certificate for %s", domain)
return True
if not self.renew_cert(domain):
return False
if self.args.dry_run:
logger.info(
"[DRY-RUN] Would concatenate certificate files for %s", domain
)
return True
# Create backup before modifying
if os.path.exists(haproxy_cert):
backup_path = f"{haproxy_cert}.bak"
shutil.copy2(haproxy_cert, backup_path)
logger.info("Created backup: %s", backup_path)
# Secure file concatenation
with open(fullchain, "rb") as fc, open(privkey, "rb") as pk, open(
haproxy_cert, "wb"
) as out:
out.write(fc.read())
out.write(pk.read())
os.chmod(haproxy_cert, 0o644)
self.fix_permissions()
logger.info("Certificate updated for %s", domain)
return True
except IOError as e:
logger.error("Error concatenating certificate files for %s: %s", domain, e)
return False
def create_placeholder_cert(self, domain):
"""Create a self-signed certificate as placeholder."""
cert_path = os.path.join(self.ssl_dir, f"{domain}.pem")
if not os.path.exists(cert_path):
try:
if self.args.dry_run:
logger.info(
"[DRY-RUN] Would create placeholder certificate for %s", domain
)
return
cmd = [
"openssl",
"req",
"-x509",
"-newkey",
"rsa:4096",
"-keyout",
cert_path,
"-out",
cert_path,
"-days",
"365",
"-nodes",
"-subj",
f"/CN={domain}",
]
subprocess.run(cmd, check=True)
os.chmod(cert_path, 0o644)
self.fix_permissions()
logger.info("Created placeholder certificate for %s", domain)
except subprocess.CalledProcessError as e:
logger.error("Error creating certificate for %s: %s", domain, e)
raise
def fix_permissions(self):
"""Fix permissions on SSL directory and files."""
if self.args.dry_run:
logger.info("[DRY-RUN] Would fix permissions for %s", self.ssl_dir)
return
os.chmod(self.ssl_dir, 0o755)
for root, dirs, files in os.walk(self.ssl_dir):
for d in dirs:
os.chmod(os.path.join(root, d), 0o755)
for f in files:
os.chmod(os.path.join(root, f), 0o644)
logger.info("Fixed permissions for all certificates")
def main():
parser = argparse.ArgumentParser(description="Manage SSL certificates for HAProxy")
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without making actual changes",
)
parser.add_argument(
"--base-dir",
default="/srv/haproxy",
help="Base directory for HAProxy configuration",
)
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
args = parser.parse_args()
if args.debug:
logger.setLevel(logging.DEBUG)
if args.dry_run:
logger.info("Running in DRY-RUN mode - no changes will be made")
try:
cert_manager = CertManager(args, args.base_dir)
cert_manager.ensure_ssl_dir()
domains = cert_manager.extract_domains()
for domain in sorted(domains):
logger.info("Processing domain: %s", domain)
if cert_manager.concat_letsencrypt_cert(domain):
logger.info("Used existing Let's Encrypt certificate for %s", domain)
else:
if cert_manager.try_get_letsencrypt_cert(
domain
) and cert_manager.concat_letsencrypt_cert(domain):
logger.info(
"Successfully obtained new Let's Encrypt certificate for %s",
domain,
)
else:
cert_manager.create_placeholder_cert(domain)
logger.info("Created placeholder certificate for %s", domain)
cert_manager.fix_permissions()
logger.info("Certificate management completed successfully")
return 0
except Exception as e:
logger.error("Fatal error: %s", e)
return 1
if __name__ == "__main__":
exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment