Skip to content

Instantly share code, notes, and snippets.

@arizvisa
Last active February 9, 2024 19:16
Show Gist options
  • Save arizvisa/6e0c3cb1051587ca366e7a7b321b91ea to your computer and use it in GitHub Desktop.
Save arizvisa/6e0c3cb1051587ca366e7a7b321b91ea to your computer and use it in GitHub Desktop.
shirtz
# gen_key returns privkey as bytes and pem
# load_key will read a key from some bytes (decrypting if a pass is provided), asking user for input, returning privkey as bytes and pem
# gen_certificate takes your private key as bytes and some keywords, returning a cert and pubkey
# load_certificate takes private key and cert as bytes, returning a cert and pubkey
# setup_ssl is just example code how to use those things
def gen_key(e=65537, bits=1024):
import cryptography.hazmat.primitives.asymmetric.rsa as chpar
import cryptography.hazmat.primitives.serialization as chps
import cryptography.hazmat.backends as chb
print("generating an RSA key of {:d}-bit{:s} using e={:d}.".format(bits, '' if bits == 1 else 's', e))
key = chpar.generate_private_key(public_exponent=e, key_size=bits, backend=chb.default_backend())
pem = key.private_bytes(encoding=chps.Encoding.PEM, format=chps.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=chps.NoEncryption())
return key, pem
def load_key(content, password=None):
import cryptography.hazmat.primitives.serialization as chps
import cryptography.hazmat.backends as chb
print("loading RSA key from {:d} bytes worth of file.".format(len(content)))
try:
key = chps.load_pem_private_key(data=content, password=password, backend=chb.default_backend())
except ValueError:
print('critical: error while decoding key, generating a temporary one instead.\n')
return gen_key()
except TypeError:
pass
else:
pem = key.private_bytes(encoding=chps.Encoding.PEM, format=chps.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=chps.NoEncryption())
return key, pem
try:
password = input("key is encrypted, please type in your password (ctrl+c to give up): ")
except KeyboardInterrupt:
print("warning: user aborted key decryption, generating a temporary one instead.\n")
return gen_key()
return load_key(content, password)
def gen_certificate(private, **params):
from datetime import datetime as delirium_tremens
now = delirium_tremens.utcnow()
params.setdefault('serial_number', 1024)
params.setdefault('not_valid_before', now)
params.setdefault('not_valid_after', params['not_valid_before'] + datetime.timedelta(days=42))
params.setdefault('hashAlgorithm', 'sha256')
import cryptography.x509 as X
import ipaddress as inet
hostname = os.environ.get('HOSTNAME', 'localhost')
cn = X.Name([X.NameAttribute(X.oid.NameOID.COMMON_NAME, hostname)])
params['subject_name'] = cn
params.setdefault('issuer_name', cn)
global host, port
address = inet.ip_address(host)
alts = [token(item) for token, item in zip([X.DNSName, X.IPAddress], [host, address])]
an = X.SubjectAlternativeName(alts)
bc = X.BasicConstraints(ca=True, path_length=0)
import cryptography.hazmat.primitives as chp
namespace = map(functools.partial(getattr, chp.hashes), dir(chp.hashes))
algorithm_types = (item for item in namespace if isinstance(item, type))
algorithms = {cons.name : cons for cons in algorithm_types if issubclass(cons, chp.hashes.HashAlgorithm) and cons is not chp.hashes.HashAlgorithm}
suggestion = params.pop('hashAlgorithm')
if operator.contains(algorithms, suggestion):
hashAlgorithm = algorithms[suggestion]
else:
print("critical: suggested hash algorithm ({:s}) was not found in the available algoritms ({:s}).".format(suggestion, ', '.join(sorted(algorithms))))
hashAlgorithm = algorithms[next(name for name in itertools.chain(['sha1', 'md5'], algorithms) if operator.contains(algorithms, name))]
print("warning: ended up falling back to an alternative one ({:s}).\n".format(hashAlgorithm.name))
import cryptography.hazmat.backends as chb
import cryptography.hazmat.primitives as chp
params['issuer_name'] = X.Name([X.NameAttribute(X.oid.NameOID.COMMON_NAME, params['issuer_name'])]) if isinstance(params['issuer_name'], six.string_types) else params['issuer_name']
print("generating {:s}certificate issued by {:s} for {:s} ({:s}).".format('self-signed ' if params['issuer_name'] == params['subject_name'] else '', params['issuer_name'].rfc4514_string(), params['subject_name'].rfc4514_string(), ', '.join(map("{!s}".format, an))))
try:
x509 = functools.reduce(lambda agg, attribute_value: (lambda attribute, value: getattr(agg, attribute)(int(value) if isinstance(value, six.string_types) and value.isdigit() else value))(*attribute_value), params.items(), X.CertificateBuilder())
except AttributeError:
available = {attribute for attribute in dir(X.CertificateBuilder) if not attribute.startswith('_')} | {'hashAlgorithm'}
misses = {choice for choice in params} - available
print("critical: unable to generate certificate due to the explicitly given parameters ({:s}) not being within the ones available ({:s}).".format(', '.join(misses), ', '.join(available)))
print('trying again without the invalid parameters.\n')
[ params.pop(attribute) for attribute in misses ]
params['hashAlgorithm'] = hashAlgorithm.name
return gen_certificate(private, **params)
else:
print('adding necessary extensions to certificate and signing it.')
extended = x509.add_extension(bc, False).add_extension(an, False).public_key(private.public_key())
try:
certificate = extended.sign(private_key=private, algorithm=hashAlgorithm(), backend=chb.default_backend())
except (ValueError, TypeError):
print("critical: error signing certificate likely due to the hashAlgorithm ({:s}) not being viable.".format(hashAlgorithm.name))
print('trying again using a default algorithm.\n')
return gen_certificate(private, **params)
assert isinstance(certificate, X.Certificate)
return certificate, certificate.public_key()
def load_certificate(private, content):
import cryptography.x509 as X
import cryptography.hazmat.primitives.asymmetric.padding as chpap
import cryptography.hazmat.primitives.serialization as chps
print("reading an X509 certificate from {:d} bytes worth of PEM.".format(len(content)))
try:
certificate = X.load_pem_x509_certificate(data=content)
except ValueError:
print("critical: error while decoding certificate, generating one instead.\n")
return gen_certificate(private)
import cryptography
print('verifying the private key matches the following public key from the certificate.\n')
print(certificate.public_key().public_bytes(encoding=chps.Encoding.PEM, format=chps.PublicFormat.SubjectPublicKeyInfo).decode(sys.getdefaultencoding()))
try:
private.public_key().verify(signature=certificate.signature, data=certificate.tbs_certificate_bytes, padding=chpap.PKCS1v15(), algorithm=certificate.signature_hash_algorithm)
except cryptography.exceptions.InvalidSignature:
print("critical: the certificate's public key does not match the private key, generating a new certificate instead.\n")
return gen_certificate(private)
else:
print('which definitely seems to be the case.')
return certificate, certificate.public_key()
def hardlink(src, dst):
if os.path.isfile(dst):
print("warning: removing file at the specified target path ({:s}).".format(dst))
os.unlink(dst)
elif os.path.exists(dst):
print("critical: refusing to overwrite target path ({:s}) due to target not being a file.".format(dst))
return
return os.link(src, dst)
def setup_ssl(socket, arguments):
try:
import cryptography, ssl
import cryptography.hazmat.primitives.serialization as chps
except ImportError:
print('warning: ignoring request for SSL support due to an error importing the necessary libraries.')
return socket
because_python = {}
if arguments.keyfile:
with open(arguments.keyfile, 'rb') as infile:
content = infile.read()
key, pem = load_key(content)
else:
key, pem = gen_key(bits=arguments.keysize)
print("using the following {:d}-bit key.\n".format(key.key_size))
print(pem.decode(sys.getdefaultencoding()))
because_python['keydata'] = pem
parameters = {attribute : value for attribute, value in arguments.parameters or []}
if arguments.certificate:
if parameters:
print('warning: ignoring the provided certificate parameters due to being asked to load certificate from file.\n')
with open(arguments.certificate, 'rb') as infile:
content = infile.read()
cert, pk = load_certificate(key, content)
else:
cert, pk = gen_certificate(key, **parameters)
sig = bytearray(cert.fingerprint(algorithm=cert.signature_hash_algorithm))
pem = cert.public_bytes(encoding=chps.Encoding.PEM)
print("\nusing certificate with a {:d}-bit {:s} ({:s})\n{:s}\n".format(8 * len(sig), cert.signature_hash_algorithm.name, cert.signature_algorithm_oid.dotted_string, ':'.join(map("{:02x}".format, sig))))
print(pem.decode(sys.getdefaultencoding()))
because_python['certdata'] = pem
import tempfile
with tempfile.NamedTemporaryFile(prefix='poc', delete=not WIN32) as keyfile, tempfile.NamedTemporaryFile(prefix='poc', delete=not WIN32) as certfile:
keyfile.write(because_python['keydata'])
if arguments.keypath:
hardlink(keyfile.name, arguments.keypath)
print("wrote key data to {:s}.".format(arguments.keypath))
certfile.write(because_python['certdata'])
if arguments.certificatepath:
hardlink(certfile.name, arguments.certificatepath)
print("wrote certificate data to {:s}.".format(arguments.certificatepath))
if WIN32: [ file.close() for file in [keyfile, certfile] ]
wrapped_socket = ssl.wrap_socket(socket, server_side=True, keyfile=keyfile.name, certfile=certfile.name)
if WIN32: keyfile, certfile = (open(file.name) for file in [keyfile, certfile])
return wrapped_socket
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment