Skip to content

Instantly share code, notes, and snippets.

@tyrells
Last active September 21, 2020 17:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tyrells/4667515e4d54ab1c01a48aa323a30ce5 to your computer and use it in GitHub Desktop.
Save tyrells/4667515e4d54ab1c01a48aa323a30ce5 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import sys
import os.path
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import hashlib
import random
from datetime import timedelta
from cryptography.hazmat.primitives.asymmetric import rsa, dsa, ec, ed25519, ed448
# Mozilla CA certificate list in PEM format
# downloaded from https://curl.haxx.se/docs/caextract.html
CA_CERTS = "cacert.pem"
# Boolean that instructs the application to include CA certificates
INCLUDE_CA_CERTS = True
# Output file paths
CA_CERT_FNAME = "ca.pem"
SERVER_CERT_FNAME = "server.pem"
SERVER_KEY_FNAME = "server.key"
# used internally
FINGERPRINT_ALGORITHM = hashlib.sha1
def is_binary_file(fname):
"""Check if a file contains non-ascii characters"""
# define range of ascii characters
textchars = bytearray({7,8,9,10,12,13,27} | set(range(0x20, 0x100)) - {0x7f})
with open(fname, 'rb') as f:
bytes = f.read(1024)
return bool(bytes.translate(None, textchars))
def process_der_cert(fname):
"""Process files containing single certificates in binary DER format"""
with open(fname, 'rb') as f:
cert_bytes = f.read()
cert = x509.load_der_x509_certificate(cert_bytes, default_backend())
yield cert
def process_pem_cert(fname):
"""Process files containing multiple certificates in ascii PEM format"""
with open(fname, 'r') as f:
cert_bytes = bytearray()
for line in f:
if "END CERTIFICATE" in line:
cert_bytes.extend(line.encode())
cert = x509.load_pem_x509_certificate(cert_bytes, default_backend())
cert_bytes = bytearray()
yield cert
else:
cert_bytes.extend(line.encode())
def process_certificates(fname):
"""Process either PEM or DER formatted certificate files"""
ext = os.path.splitext(fname)[-1].lower()
try:
if ext == '.pem' or not is_binary_file(fname):
for c in process_pem_cert(fname):
yield(c)
print("INFO: Processing file: {}".format(fname), file=sys.stderr)
elif ext == '.der' or is_binary_file(fname):
for c in process_der_cert(fname):
yield(c)
print("INFO: Processing file: {}".format(fname), file=sys.stderr)
except ValueError as e:
# ValueError execptions are related to files that cannot
# be processed. Ignore these files and continue.
if "Unable to load certificate" in str(e):
print("WARNING: Wrong format, ignoring file: {}".format(fname), file=sys.stderr)
return
raise e
def get_cert_issuer(cert, cert_store, ca_store):
"""Find the issuer of a certificate, and return if the certificate was
self-signed and if it was found in the trusted CA store"""
is_self_signed = False
is_ca_signed = False
issuer_cert = None
if cert.issuer == cert.subject:
is_self_signed = True
issuer_cert = cert
for c in cert_store.values():
if cert.issuer == c.subject:
issuer_cert = c
for c in ca_store.values():
if cert.issuer == c.subject:
issuer_cert = c
is_ca_signed = True
return (is_self_signed, is_ca_signed, issuer_cert)
def generate_key_from_cert(cert):
"""Generate new private/public key pair using the same parameters
included in the certificate"""
public_key = cert.public_key()
if isinstance(public_key, rsa.RSAPublicKey):
key = rsa.generate_private_key(
public_exponent=public_key.public_numbers().e,
key_size=public_key.key_size,
backend=default_backend()
)
elif isinstance(public_key, dsa.DSAPublicKey):
key = dsa.generate_private_key(
key_size=public_key.key_size,
backend=default_backend()
)
elif isinstance(public_key, ec.EllipticCurvePublicKey):
key = ec.generate_private_key(
curve=public_key.curve,
backend=default_backend()
)
elif isinstance(public_key, ed25519.Ed25519PublicKey):
key = ed25519.Ed25519PrivateKey.generate()
elif isinstance(public_key, ed448.Ed448PublicKey):
key = ed448.Ed448PrivateKey.generate()
return key
def generate_key_from_signer(cert):
"""Generate new private/public key pair using the same parameters
known about the certififcate issuer, guess what we don't know."""
# if certificate is self-signed, generate key from certificate
# directly, without trying to guess issuer parameters
if cert.issuer == cert.subject:
return generate_key_from_cert(cert)
# FIX: imports a private dictionary from library to do lookup
# from signature algorithm to hashing and encrypt functions
from cryptography.x509.oid import _SIG_OIDS_TO_HASH, _OID_NAMES
signature_hash = _SIG_OIDS_TO_HASH[cert.signature_algorithm_oid]
signature_algorithm_name = _OID_NAMES[cert.signature_algorithm_oid]
if signature_algorithm_name.endswith("RSAEncryption"):
# exponent chosen according to best practices
# key length based on actual results
key = rsa.generate_private_key(
public_exponent=65537,
key_size=len(cert.signature) * 8,
backend=default_backend()
)
elif signature_algorithm_name.startswith("RSASSA"):
# use RSA instead with default parameters
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
elif signature_algorithm_name.startswith("ecdsa-"):
# heuristic algorithm, because couldn't find any info on
# signature lengths
if len(cert.signature) < 90:
curve = ec.SECP256R1
else:
curve = ec.SECP384R1
key = ec.generate_private_key(
curve=curve,
backend=default_backend()
)
elif signature_algorithm_name.startswith("dsa-"):
# chosen based on best practice, as couldn't find any examples
# because DSA is not used in practice
key = dsa.generate_private_key(
key_size=2048,
backend=default_backend()
)
return "DSA"
elif signature_algorithm_name == "ed25519":
key = ed25519.Ed25519PrivateKey.generate()
elif signature_algorithm_name == "ed448":
key = ed448.Ed448PrivateKey.generate()
return key
def clone_certificate(cert, key_store):
"""Clone an existing certificate, and generate a new private/public
key pair for the new certificate"""
key = generate_key_from_cert(cert)
fingerprint = FINGERPRINT_ALGORITHM(cert.subject.rfc4514_string().encode()).hexdigest()
key_store[fingerprint] = key
new_cert_builder = (
x509.CertificateBuilder()
.issuer_name(cert.issuer)
.subject_name(cert.subject)
.public_key(key.public_key())
.serial_number(cert.serial_number)
.not_valid_before(cert.not_valid_before)
.not_valid_after(cert.not_valid_after)
)
## Fix: add some filtering here later for 'safe' extensions and enable again
#for ext in cert.extensions:
# new_cert_builder.add_extension(ext.value, ext.critical)
issuer_fingerprint = FINGERPRINT_ALGORITHM(cert.issuer.rfc4514_string().encode()).hexdigest()
return key, new_cert_builder.sign(key_store[issuer_fingerprint], cert.signature_hash_algorithm, default_backend())
def generate_issuer(cert):
"""Generate a fake issuer certificate cloning as many of the issuers
details that are known"""
key = generate_key_from_signer(cert)
serial_number = random.getrandbits(64)
try:
ext = cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
# FIX: we're just taking the first name returned, may want to change this to generate multiple certificates
issuer_name = ext.value.authority_cert_issuer[0].value if ext.value.authority_cert_issuer is not None else cert.issuer
serial_number = ext.value.authority_cert_serial_number if ext.value.authority_cert_serial_number is not None else random.getrandbits(64)
except x509.ExtensionNotFound:
# no AuthorityKeyIdentifier extension present
pass
issuer_cert = (
x509.CertificateBuilder()
.issuer_name(issuer_name)
.subject_name(cert.issuer)
.public_key(key.public_key())
.serial_number(serial_number)
.not_valid_before(cert.not_valid_before - timedelta(days=5*365))
.not_valid_after(cert.not_valid_after + timedelta(days=5*365))
)
try:
ext = cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
key_identifier = ext.value.key_identifier
ski = x509.SubjectKeyIdentifier(key_identifier)
issuer_cert.add_extension(ski, False)
except x509.ExtensionNotFound:
# no AuthorityKeyIdentifier extension present, not including key identifier
pass
return key, issuer_cert.sign(key, cert.signature_hash_algorithm, default_backend())
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: {} [input file/dir]".format(sys.argv[0]), file=sys.stderr)
sys.exit(1)
# allow processing multiple files/directories
in_files = []
for arg in sys.argv[1:]:
# process single file
if os.path.isfile(arg):
in_files.append(arg)
# recursively process directory
elif os.path.isdir(arg):
for subdir, dirs, files in os.walk(arg):
for f in files:
fpath = os.path.join(subdir, f)
in_files.append(fpath)
else:
print("ERROR: {} does not exist".format(arg), file=sys.stderr)
sys.exit(2)
ca_store = {}
# process trusted root CA certificates
if INCLUDE_CA_CERTS:
print("INFO: Loading CA ceritificates", file=sys.stderr)
for c in process_certificates(CA_CERTS):
fingerprint = FINGERPRINT_ALGORITHM(c.subject.rfc4514_string().encode()).hexdigest()
if fingerprint in ca_store:
print("WARNING: Duplicate CA ceritificate detected: {}".format(c.subject), file=sys.stderr)
else:
ca_store[fingerprint] = c
cert_store = {}
print("INFO: Loading ceritificates", file=sys.stderr)
for fname in in_files:
for c in process_certificates(fname):
fingerprint = FINGERPRINT_ALGORITHM(c.subject.rfc4514_string().encode()).hexdigest()
if fingerprint in cert_store:
print("WARNING: Duplicate ceritificate detected: {}".format(c.subject), file=sys.stderr)
else:
cert_store[fingerprint] = c
print("\nLoaded certificates:\nHash\t\t\t\t\t\tName\n----------------------------------------------------------------------------", file=sys.stderr)
for k, v in cert_store.items():
print("{}\t{}".format(k, v.subject.rfc4514_string()), file=sys.stderr)
print("----------------------------------------------------------------------------", file=sys.stderr)
cert_choice = input('\nEnter your certificate choice [hash]: ')
output_cert_chain = []
output_cert_chain.append(cert_store[cert_choice])
chain_complete = False
new_cert_added = True
while not chain_complete and new_cert_added:
c = output_cert_chain[-1]
is_self_signed, is_ca_signed, issuer_cert = get_cert_issuer(c, cert_store, ca_store)
if is_self_signed:
print("INFO: Self-Signed Certificate Chain created successfully", file=sys.stderr)
break
elif is_ca_signed:
output_cert_chain.append(issuer_cert)
print("INFO: Trusted Certificate Chain created successfully (might be able to register with same CA to bypass authentication)", file=sys.stderr)
break
elif issuer_cert is not None:
output_cert_chain.append(issuer_cert)
else:
print("WARNING: Generating missing certificate: {}".format(c.issuer.rfc4514_string()), file=sys.stderr)
_, new_cert = generate_issuer(c)
output_cert_chain.append(new_cert)
key_store = {}
old_ca_cert = output_cert_chain.pop()
print("\nINFO: Cloning CA certificate: {}".format(old_ca_cert.subject.rfc4514_string()), file=sys.stderr)
ca_key, new_ca_cert = clone_certificate(old_ca_cert, key_store)
print("INFO: Saving CA certificate to {}\n".format(CA_CERT_FNAME), file=sys.stderr)
with open(CA_CERT_FNAME, "wb") as f:
f.write(new_ca_cert.public_bytes(encoding=serialization.Encoding.PEM))
server_keys = []
server_certs = []
for cert in reversed(output_cert_chain):
print("INFO: Cloning server certificate chain: {}".format(cert.subject.rfc4514_string()), file=sys.stderr)
new_key, new_cert = clone_certificate(cert, key_store)
server_certs.append(new_cert)
server_keys.append(new_key)
print("INFO: Saving server certificate chain to {}".format(SERVER_CERT_FNAME), file=sys.stderr)
with open(SERVER_CERT_FNAME, "wb") as f:
for cert in reversed(server_certs):
f.write(cert.public_bytes(encoding=serialization.Encoding.PEM))
print("INFO: Saving server private key to {}".format(SERVER_KEY_FNAME), file=sys.stderr)
with open(SERVER_KEY_FNAME, "wb") as f:
f.write(server_keys[-1].private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment