Skip to content

Instantly share code, notes, and snippets.

@fantix
Created June 17, 2021 16:03
Show Gist options
  • Save fantix/759ebf1f6fcbc3e9adfd507ffc24074b to your computer and use it in GitHub Desktop.
Save fantix/759ebf1f6fcbc3e9adfd507ffc24074b to your computer and use it in GitHub Desktop.
Local CA with cryptography
import datetime
import os
import socket
import ssl
import threading
import uuid
from cryptography import x509
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509 import oid
def _new_cert(name, issuer=None):
backend = backends.default_backend()
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=backend
)
public_key = private_key.public_key()
subject = x509.Name([x509.NameAttribute(oid.NameOID.COMMON_NAME, name)])
builder = (
x509.CertificateBuilder()
.subject_name(subject)
.public_key(public_key)
.serial_number(int(uuid.uuid4()))
)
if issuer:
issuer_cert, signing_key = issuer
builder = (
builder.issuer_name(issuer_cert.subject)
.not_valid_before(issuer_cert.not_valid_before)
.not_valid_after(issuer_cert.not_valid_after)
)
else:
signing_key = private_key
builder = (
builder.issuer_name(subject)
.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
)
.not_valid_before(
datetime.datetime.today() - datetime.timedelta(days=1)
)
.not_valid_after(
datetime.datetime.today() + datetime.timedelta(weeks=1000)
)
)
certificate = builder.sign(
private_key=signing_key,
algorithm=hashes.SHA256(),
backend=backend,
)
return certificate, private_key
def _write_cert(path, cert_key_pair):
certificate, private_key = cert_key_pair
with open(path, "wb") as f:
f.write(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
f.write(
certificate.public_bytes(
encoding=serialization.Encoding.PEM,
)
)
def new_ca(path, name):
cert_key_pair = _new_cert(name)
_write_cert(path, cert_key_pair)
return cert_key_pair
def load_ca(path):
with open(path, "rb") as f:
data = f.read()
private_key = serialization.load_pem_private_key(data, None)
certificate = x509.load_pem_x509_certificate(data)
return certificate, private_key
def new_cert(path, name, ca_cert_key_pair):
cert_key_pair = _new_cert(name, issuer=ca_cert_key_pair)
_write_cert(path, cert_key_pair)
def main():
if not os.path.exists("server.pem") or not os.path.exists("ca.pem"):
try:
ca_cert_key_pair = load_ca("ca.pem")
except FileNotFoundError:
ca_cert_key_pair = new_ca("ca.pem", "Development CA")
new_cert("server.pem", "Development Server", ca_cert_key_pair)
s1, s2 = socket.socketpair()
def server():
ctx = ssl.SSLContext()
ctx.load_cert_chain("server.pem")
ss1 = ctx.wrap_socket(s1, server_side=True)
print(ss1.recv(1024))
ss1.send(b"World")
t = threading.Thread(target=server)
t.start()
client_ctx = ssl.SSLContext()
client_ctx.load_verify_locations("ca.pem")
client_ctx.verify_mode = ssl.CERT_REQUIRED
client_ctx.check_hostname = False
ss2 = client_ctx.wrap_socket(s2)
ss2.send(b"Hello")
print(ss2.recv(1024))
t.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment