Last active
June 5, 2025 16:02
-
-
Save franzos/192aa0e59c7e48f4fa412fcb576515f0 to your computer and use it in GitHub Desktop.
Manage SSL Certificates on Docker Host with HAProxy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Certificate Management Script for HAProxy | |
Shared on: https://gist.github.com/franzos/192aa0e59c7e48f4fa412fcb576515f0 | |
Expected Directory Structure: | |
- Base directory (default: /srv/haproxy) | |
- ssl/ # Where HAProxy expects .pem certs (managed by this script) | |
- haproxy.cfg # HAProxy configuration file (domains are extracted from here) | |
- ignore-domains.txt # (Optional) List of domains to ignore, one per line | |
ignore-domains.txt: | |
- Place in the base directory (default: /srv/haproxy) | |
- Each line: a domain to ignore (e.g., test.example.com) | |
- Blank lines and lines starting with # are ignored | |
CLI Parameters: | |
--dry-run Show actions without making changes | |
--base-dir Set base directory (default: /srv/haproxy) | |
--debug Enable debug logging | |
Dependencies: (pip3 install) pyopenssl validators | |
""" | |
import argparse | |
import re | |
import os | |
import subprocess | |
import shutil | |
import logging | |
from datetime import datetime | |
import OpenSSL.crypto as crypto | |
import validators | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
logger = logging.getLogger(__name__) | |
class CertManager: | |
def __init__(self, args, base_dir="/srv/haproxy"): | |
self.args = args | |
self.base_dir = base_dir | |
self.ssl_dir = os.path.join(base_dir, "ssl") | |
self.haproxy_cfg = os.path.join(base_dir, "haproxy.cfg") | |
self.ignore_file = os.path.join(base_dir, "ignore-domains.txt") | |
def ensure_ssl_dir(self): | |
"""Ensure SSL directory exists with correct permissions.""" | |
if not self.args.dry_run: | |
os.makedirs(self.ssl_dir, mode=0o750, exist_ok=True) | |
logger.info("SSL directory ensured: %s", self.ssl_dir) | |
return self.ssl_dir | |
def get_ignored_domains(self): | |
"""Get list of domains to ignore from configuration file.""" | |
if not os.path.exists(self.ignore_file): | |
return set() | |
try: | |
with open(self.ignore_file, "r", encoding="utf8") as f: | |
return {d.strip() for d in f.readlines() if d.strip()} | |
except IOError as e: | |
logger.warning("Could not read ignore file: %s", e) | |
return set() | |
def is_valid_domain(self, domain): | |
"""Validate domain format.""" | |
return validators.domain(domain) and not domain.startswith(".") | |
def extract_domains(self): | |
"""Extract domains from HAProxy configuration.""" | |
if not os.path.exists(self.haproxy_cfg): | |
raise FileNotFoundError(f"HAProxy config not found: {self.haproxy_cfg}") | |
domains = set() | |
ignored_domains = self.get_ignored_domains() | |
try: | |
with open(self.haproxy_cfg, "r", encoding="utf8") as f: | |
content = f.read() | |
# Find domains in SNI-based routing (req.ssl_sni) | |
sni_patterns = re.finditer(r"req\.ssl_sni\s+([^\s\}]+)", content) | |
for match in sni_patterns: | |
domain = match.group(1).strip("\" '") | |
domain = domain.split(":")[0] | |
if domain not in ignored_domains and self.is_valid_domain(domain): | |
domains.add(domain) | |
# Find domains in hdr(host) statements | |
hdr_patterns = re.finditer(r"hdr\(host\)\s*-i\s+([^\s]+)", content) | |
for match in hdr_patterns: | |
domain = match.group(1).strip("\" '") | |
domain = domain.split(":")[0] # Remove port if present | |
if domain not in ignored_domains and self.is_valid_domain(domain): | |
domains.add(domain) | |
# Find domains in bind statements | |
bind_patterns = re.finditer( | |
r"bind\s+[^#\n]*crt\s+[^#\n]*\/([^\/\s]+\.pem)", content | |
) | |
for match in bind_patterns: | |
domain = match.group(1).strip().replace(".pem", "") | |
if domain not in ignored_domains and self.is_valid_domain(domain): | |
domains.add(domain) | |
logger.info("Found domains: %s", sorted(domains)) | |
if ignored_domains: | |
logger.info("Ignored domains: %s", sorted(ignored_domains)) | |
return domains | |
except IOError as e: | |
raise RuntimeError(f"Failed to read HAProxy config: {e}") | |
def check_cert_expiry(self, cert_path, min_days=10): | |
"""Check if certificate is approaching expiration.""" | |
try: | |
with open(cert_path, "rb") as f: | |
cert_data = f.read() | |
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_data) | |
expiry_date = datetime.strptime( | |
cert.get_notAfter().decode("ascii"), "%Y%m%d%H%M%SZ" | |
) | |
days_remaining = (expiry_date - datetime.now()).days | |
if days_remaining < min_days: | |
logger.warning("Certificate expires in %s days", days_remaining) | |
else: | |
logger.info("Certificate valid for %s days", days_remaining) | |
return days_remaining >= min_days | |
except Exception as e: | |
logger.error("Error checking certificate expiry: %s", e) | |
return False | |
def renew_cert(self, domain): | |
"""Attempt to renew a Let's Encrypt certificate.""" | |
try: | |
# Try renewal with dry-run first | |
dry_run_cmd = [ | |
"docker", | |
"exec", | |
"-i", | |
"letsencrypt", | |
"certbot", | |
"renew", | |
"--dry-run", | |
"--cert-name", | |
domain, | |
# TODO: Test | |
# "--no-random-sleep-on-renew" | |
] | |
result = subprocess.run( | |
dry_run_cmd, check=True, capture_output=True, text=True | |
) | |
if "The dry run was successful" in result.stdout: | |
logger.info("Dry-run renewal successful for %s", domain) | |
if not self.args.dry_run: | |
renew_cmd = [ | |
"docker", | |
"exec", | |
"-i", | |
"letsencrypt", | |
"certbot", | |
"renew", | |
"--cert-name", | |
domain, | |
"--no-random-sleep-on-renew", | |
] | |
subprocess.run(renew_cmd, check=True) | |
logger.info("Certificate renewed for %s", domain) | |
return True | |
else: | |
logger.error("Dry-run renewal failed for %s", domain) | |
return False | |
except subprocess.CalledProcessError as e: | |
logger.error("Failed to renew certificate for %s: %s", domain, e) | |
return False | |
def try_get_letsencrypt_cert(self, domain): | |
"""Attempt to obtain a new Let's Encrypt certificate.""" | |
logger.info("Attempting to get certificate for %s", domain) | |
try: | |
dry_run_cmd = [ | |
"docker", | |
"exec", | |
"-i", | |
"letsencrypt", | |
"certbot", | |
"certonly", | |
"--standalone", | |
"--dry-run", | |
"-d", | |
domain, | |
] | |
result = subprocess.run( | |
dry_run_cmd, check=True, capture_output=True, text=True | |
) | |
if "The dry run was successful" in result.stdout: | |
if self.args.dry_run: | |
logger.info("[DRY-RUN] Would obtain new certificate for %s", domain) | |
return True | |
logger.info("Attempting actual certificate acquisition for %s", domain) | |
cert_cmd = [ | |
"docker", | |
"exec", | |
"-i", | |
"letsencrypt", | |
"certbot", | |
"certonly", | |
"--standalone", | |
"-d", | |
domain, | |
] | |
subprocess.run(cert_cmd, check=True) | |
return True | |
return False | |
except subprocess.CalledProcessError as e: | |
logger.error( | |
"Failed to get Let's Encrypt certificate for %s: %s", domain, e | |
) | |
return False | |
def concat_letsencrypt_cert(self, domain): | |
"""Combine certificate and key files for HAProxy.""" | |
live_dir = f"/srv/letsencrypt/data/live/{domain}" | |
haproxy_cert = os.path.join(self.ssl_dir, f"{domain}.pem") | |
fullchain = os.path.join(live_dir, "fullchain.pem") | |
privkey = os.path.join(live_dir, "privkey.pem") | |
if not (os.path.exists(fullchain) and os.path.exists(privkey)): | |
return False | |
try: | |
needs_renewal = not self.check_cert_expiry(fullchain) | |
if needs_renewal: | |
if self.args.dry_run: | |
logger.info("[DRY-RUN] Would renew certificate for %s", domain) | |
return True | |
if not self.renew_cert(domain): | |
return False | |
if self.args.dry_run: | |
logger.info( | |
"[DRY-RUN] Would concatenate certificate files for %s", domain | |
) | |
return True | |
# Create backup before modifying | |
if os.path.exists(haproxy_cert): | |
backup_path = f"{haproxy_cert}.bak" | |
shutil.copy2(haproxy_cert, backup_path) | |
logger.info("Created backup: %s", backup_path) | |
# Secure file concatenation | |
with open(fullchain, "rb") as fc, open(privkey, "rb") as pk, open( | |
haproxy_cert, "wb" | |
) as out: | |
out.write(fc.read()) | |
out.write(pk.read()) | |
os.chmod(haproxy_cert, 0o644) | |
self.fix_permissions() | |
logger.info("Certificate updated for %s", domain) | |
return True | |
except IOError as e: | |
logger.error("Error concatenating certificate files for %s: %s", domain, e) | |
return False | |
def create_placeholder_cert(self, domain): | |
"""Create a self-signed certificate as placeholder.""" | |
cert_path = os.path.join(self.ssl_dir, f"{domain}.pem") | |
if not os.path.exists(cert_path): | |
try: | |
if self.args.dry_run: | |
logger.info( | |
"[DRY-RUN] Would create placeholder certificate for %s", domain | |
) | |
return | |
cmd = [ | |
"openssl", | |
"req", | |
"-x509", | |
"-newkey", | |
"rsa:4096", | |
"-keyout", | |
cert_path, | |
"-out", | |
cert_path, | |
"-days", | |
"365", | |
"-nodes", | |
"-subj", | |
f"/CN={domain}", | |
] | |
subprocess.run(cmd, check=True) | |
os.chmod(cert_path, 0o644) | |
self.fix_permissions() | |
logger.info("Created placeholder certificate for %s", domain) | |
except subprocess.CalledProcessError as e: | |
logger.error("Error creating certificate for %s: %s", domain, e) | |
raise | |
def fix_permissions(self): | |
"""Fix permissions on SSL directory and files.""" | |
if self.args.dry_run: | |
logger.info("[DRY-RUN] Would fix permissions for %s", self.ssl_dir) | |
return | |
os.chmod(self.ssl_dir, 0o755) | |
for root, dirs, files in os.walk(self.ssl_dir): | |
for d in dirs: | |
os.chmod(os.path.join(root, d), 0o755) | |
for f in files: | |
os.chmod(os.path.join(root, f), 0o644) | |
logger.info("Fixed permissions for all certificates") | |
def main(): | |
parser = argparse.ArgumentParser(description="Manage SSL certificates for HAProxy") | |
parser.add_argument( | |
"--dry-run", | |
action="store_true", | |
help="Show what would be done without making actual changes", | |
) | |
parser.add_argument( | |
"--base-dir", | |
default="/srv/haproxy", | |
help="Base directory for HAProxy configuration", | |
) | |
parser.add_argument("--debug", action="store_true", help="Enable debug logging") | |
args = parser.parse_args() | |
if args.debug: | |
logger.setLevel(logging.DEBUG) | |
if args.dry_run: | |
logger.info("Running in DRY-RUN mode - no changes will be made") | |
try: | |
cert_manager = CertManager(args, args.base_dir) | |
cert_manager.ensure_ssl_dir() | |
domains = cert_manager.extract_domains() | |
for domain in sorted(domains): | |
logger.info("Processing domain: %s", domain) | |
if cert_manager.concat_letsencrypt_cert(domain): | |
logger.info("Used existing Let's Encrypt certificate for %s", domain) | |
else: | |
if cert_manager.try_get_letsencrypt_cert( | |
domain | |
) and cert_manager.concat_letsencrypt_cert(domain): | |
logger.info( | |
"Successfully obtained new Let's Encrypt certificate for %s", | |
domain, | |
) | |
else: | |
cert_manager.create_placeholder_cert(domain) | |
logger.info("Created placeholder certificate for %s", domain) | |
cert_manager.fix_permissions() | |
logger.info("Certificate management completed successfully") | |
return 0 | |
except Exception as e: | |
logger.error("Fatal error: %s", e) | |
return 1 | |
if __name__ == "__main__": | |
exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment