Skip to content

Instantly share code, notes, and snippets.

@ryshoooo
Last active October 22, 2023 19:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryshoooo/387c7d6425421a698932b9979e582a56 to your computer and use it in GitHub Desktop.
Save ryshoooo/387c7d6425421a698932b9979e582a56 to your computer and use it in GitHub Desktop.
Check CA bundle file
import os
import requests
import subprocess
from cryptography import x509
from datetime import datetime
def check_permissions(path: str) -> None:
try:
stat_info = os.stat(path)
print(
f"owner of {ca_bundle_path} is {stat_info.st_uid}: {'OK' if (os.getuid() == stat_info.st_uid) or (stat_info.st_uid == 0) else 'NOT OK'}"
)
print(
f"{ca_bundle_path} is readable by {os.getuid()}: {'OK' if os.access(ca_bundle_path, os.R_OK) else 'NOT OK'}"
)
print(f"st_mode of CA is: {oct(stat_info.st_mode)}")
except Exception as e:
print(f"Error checking permissions: {e}")
def check_is_file_not_symlink(path: str) -> None:
print(f"{ca_bundle_path} is a file: {'OK' if os.path.isfile(path) else 'NOT OK'}")
print(
f"{ca_bundle_path} is not a link {'OK' if not os.path.islink(path) else 'NOT OK'}"
)
def check_formatting(path: str) -> None:
# Simple check: call openssl to verify the PEM format.
print(f"> Checking {ca_bundle_path} with openssl to verify PEM format")
result = subprocess.run(
["openssl", "x509", "-in", path, "-noout"], capture_output=True, text=True
)
print(
f"{ca_bundle_path} is in PEM format: {'OK' if result.returncode == 0 else 'NOT OK'}"
)
def check_with_curl_ca_bundle(path: str, test_url: str) -> None:
os.environ["CURL_CA_BUNDLE"] = path
print(f"> Checking curl on {test_url} with CA Bundle at {os.environ['CURL_CA_BUNDLE']}")
result = subprocess.run(["curl", test_url], capture_output=True, text=True)
print(
f"curl with CURL_CA_BUNDLE set to {ca_bundle_path} on {test_url}: {'OK' if result.returncode == 0 else 'NOT OK'}"
)
def check_with_ca_bundle(path: str, test_url: str) -> None:
os.environ["CA_BUNDLE"] = path
print(f"> Checking curl on {test_url} with CA Bundle at {os.environ['CA_BUNDLE']}")
result = subprocess.run(["curl", test_url], capture_output=True, text=True)
print(
f"curl with CA_BUNDLE set to {ca_bundle_path} on {test_url}: {'OK' if result.returncode == 0 else 'NOT OK'}"
)
def check_with_no_env_set(path: str, test_url: str) -> None:
del os.environ["CURL_CA_BUNDLE"]
del os.environ["CA_BUNDLE"]
result = subprocess.run(["curl", test_url], capture_output=True, text=True)
print(
f"curl with no environment variable set on {test_url}: {'OK' if result.returncode == 0 else 'NOT OK'}"
)
def request_with_builtin_ca(test_url: str) -> None:
try:
r = requests.get(test_url)
print(
f"requests {test_url} with built-in CA: {'OK' if r.status_code == 200 else 'NOT OK'}"
)
except Exception as e:
print(f"Error with built-in CA bundle: {e}")
def request_with_custom_ca(test_url: str, path: str) -> None:
try:
r = requests.get(test_url, verify=path)
print(
f"requests {test_url} with custom CA path {path}: {'OK' if r.status_code == 200 else 'NOT OK'}"
)
except Exception as e:
print(f"Error with custom CA bundle: {e}")
def examine_the_bundle_file(path: str) -> None:
try:
with open(path, "rb") as fp:
ca_bundle = x509.load_pem_x509_certificates(fp.read())
print("Loaded all the certificates from the bundle file: OK")
except Exception as e:
print(f"Failed to load all the certificates from the bundle: {e}")
with open(path, "r") as fp:
raw_data = fp.read()
begin_cert = "-----BEGIN CERTIFICATE-----"
certs = raw_data.split(begin_cert)
for cert in certs[1:]:
_cert = begin_cert + cert
try:
parsed_cert = x509.load_pem_x509_certificate(_cert.encode())
except Exception:
print("Found a malflormed certificate")
print(_cert)
print("===============================")
else:
pass
def _split_pem(bundle: str) -> list:
# Define PEM header and footer
header = b"-----BEGIN CERTIFICATE-----"
footer = b"-----END CERTIFICATE-----"
certs = []
# Split the CA bundle
parts = bundle.split(header)
for part in parts:
if footer in part:
cert_data = header + part.split(footer)[0] + footer
certs.append(cert_data)
return certs
def check_ca_bundle_expiry(path: str) -> list:
with open(path, 'rb') as f:
data = f.read()
certificates = [cert for cert in _split_pem(data) if cert]
outdated_certs = []
for cert_data in certificates:
try:
cert = x509.load_pem_x509_certificate(cert_data)
now = datetime.utcnow()
if now < cert.not_valid_before or now > cert.not_valid_after:
outdated_certs.append(cert)
except Exception as e:
pass
return outdated_certs
if __name__ == "__main__":
ca_bundle_path = "/etc/ssl/certs/ca-certificates.crt"
test_url = "https://gitlab.com"
import sys
if len(sys.argv) < 2:
print("Usage:\npython3 check_ca_env.py {ca_bundle_path} {test_url}")
print(
"E.g.:\npython3 check_ca_env.py /etc/ssl/certs/ca-certificates.crt https://gitlab.com"
)
sys.exit(1)
ca_bundle_path, test_url = sys.argv[1:]
print(f"> Testing with {ca_bundle_path} and {test_url}")
check_permissions(ca_bundle_path)
check_is_file_not_symlink(ca_bundle_path)
check_formatting(ca_bundle_path)
check_with_curl_ca_bundle(ca_bundle_path, test_url)
check_with_ca_bundle(ca_bundle_path, test_url)
check_with_no_env_set(ca_bundle_path, test_url)
request_with_builtin_ca(test_url)
request_with_custom_ca(test_url, ca_bundle_path)
examine_the_bundle_file(ca_bundle_path)
outdated = check_ca_bundle_expiry(ca_bundle_path)
if outdated:
print("NOT OK: Outdated Certificates:")
for cert in outdated:
print(f"Subject: {cert.subject}, Expired on: {cert.not_valid_after}")
else:
print("No outdated certificates found.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment