Last active
May 23, 2023 11:10
-
-
Save IMMORTALxJO/a2743a6602a53c02d86ab7280dee06a2 to your computer and use it in GitHub Desktop.
Check nginx configuration has valid certificates
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import crossplane | |
import os | |
from cryptography import x509 | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives.serialization import load_pem_private_key | |
from cryptography.x509.oid import NameOID | |
from functools import lru_cache | |
EXIT_CODE = 0 | |
## Check certificate and key are compatible | |
@lru_cache(maxsize=None) | |
def cert_collect_sans(cert_path): | |
cert_str = open(cert_path, "r").read() | |
cert_decoded = x509.load_pem_x509_certificate(str.encode(cert_str), default_backend()) | |
common_name = cert_decoded.subject.rdns[0].get_attributes_for_oid(NameOID.COMMON_NAME)[0].value | |
san = [ common_name ] | |
for ext in cert_decoded.extensions: | |
if not isinstance(ext.value, x509.extensions.SubjectAlternativeName): | |
continue | |
for v in ext.value: | |
san.append(v.value) | |
return san | |
## Check certificate has domain in SANs or common name | |
@lru_cache(maxsize=None) | |
def verify_cert_contains_domain(cert_path, domain): | |
in_wildcard = "*."+".".join(domain.split(".")[1:]) | |
out_wildcard = "*."+domain | |
san = cert_collect_sans(cert_path) | |
return \ | |
in_wildcard in san or \ | |
out_wildcard in san or \ | |
domain in san | |
## Check certificate and key are compatible | |
@lru_cache(maxsize=None) | |
def verify_cert_and_key(cert_path, key_path): | |
cert_str = open(cert_path, "r").read() | |
cert_decoded = x509.load_pem_x509_certificate( | |
str.encode(cert_str), default_backend()) | |
key_str = open(key_path, "r").read() | |
key_decoded = load_pem_private_key(str.encode( | |
key_str), password=None, backend=default_backend()) | |
return cert_decoded.public_key().public_numbers().n == key_decoded.public_key().public_numbers().n | |
## Run all tests against certificate | |
def check_cert(domain, cert_path, key_path, source_file): | |
if not verify_cert_and_key(cert_path, key_path): | |
print("[ERROR] Cert validation failed , domain=%s crt=%s key=%s file=%s" % ( | |
domain, cert_path, key_path, source_file)) | |
return False | |
if not verify_cert_contains_domain(cert_path, domain): | |
print("[ERROR] SNI validation failed , domain=%s crt=%s key=%s file=%s" % ( | |
domain, cert_path, key_path, source_file)) | |
return False | |
return True | |
# Collect all servers with certificates | |
# [ | |
# { | |
# "file": <path_to_nginx_config_file>, | |
# "server_name": [ | |
# <server_name>, | |
# ... | |
# ], | |
# "ssl_certificate": <path_to_ssl_certificate>, | |
# "ssl_certificate_key": <path_to_ssl_certificate_key> | |
# }, ... | |
# ] | |
def parse_crossplane(config_path="/etc/nginx/nginx.conf"): | |
global EXIT_CODE | |
def collect_directives(blocks, value, extract=""): | |
res = [b for b in blocks if b["directive"] == value] | |
if extract == "": | |
return res | |
r = [] | |
for x in res: | |
r += x[extract] | |
return r | |
servers = [] | |
for config in crossplane.parse(config_path)["config"]: | |
parsed = config["parsed"] | |
for server in collect_directives(parsed, "server"): | |
ssl_certificate = collect_directives(server["block"], "ssl_certificate", "args") | |
ssl_certificate_key = collect_directives(server["block"], "ssl_certificate_key", "args") | |
if len(ssl_certificate) == 0: | |
continue | |
if len(ssl_certificate_key) == 0: | |
print("[ERROR] ssl_certificate without key defined in %s" % config["file"]) | |
EXIT_CODE = 1 | |
continue | |
servers.append({ | |
"file": config["file"], | |
"server_name": collect_directives(server["block"], "server_name", "args"), | |
"ssl_certificate": ssl_certificate[0], | |
"ssl_certificate_key": ssl_certificate_key[0], | |
}) | |
return servers | |
def main(config_path="/etc/nginx/nginx.conf"): | |
global EXIT_CODE | |
for server in parse_crossplane(config_path): | |
for server_name in server["server_name"]: | |
if server_name == "_": | |
continue | |
if check_cert(server_name, server["ssl_certificate"], server["ssl_certificate_key"], server["file"]): | |
continue | |
EXIT_CODE = 1 | |
main() | |
os._exit(EXIT_CODE) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment