Skip to content

Instantly share code, notes, and snippets.

@jeffreytolar
Created May 29, 2024 19:10
Show Gist options
  • Save jeffreytolar/ea05b3092df12dc6e5b518e58e6821ad to your computer and use it in GitHub Desktop.
Save jeffreytolar/ea05b3092df12dc6e5b518e58e6821ad to your computer and use it in GitHub Desktop.
from concurrent.futures import ThreadPoolExecutor, wait
import http.server
import random
import socketserver
import ssl
import time
from OpenSSL import crypto
def generate_certificate(subj, key_file, cert_file):
# Create a key pair
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
cert = crypto.X509()
cert.get_subject().CN = subj
cert.set_serial_number(random.randrange(10, 10000))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(24*60*60)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
if subj == "localhost":
cert.add_extensions([
crypto.X509Extension(b"subjectAltName", False, b"DNS:localhost, IP:127.0.0.1")
])
cert.sign(key, 'sha256')
with open(cert_file, "wb") as fh:
fh.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
with open(key_file, "wb") as fh:
fh.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
class MyHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
cert = self.connection.getpeercert()
subject = dict(x[0] for x in cert['subject'])
subject_name = subject.get('commonName', 'Unknown')
self.wfile.write(f'CN: {subject_name} / URL: {self.path}'.encode())
def log_message(self, format, *args):
pass
class HTTPSServer(socketserver.TCPServer):
def __init__(self, server_address, RequestHandlerClass):
super().__init__(server_address, RequestHandlerClass, bind_and_activate=True)
self.ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
self.ssl_context.load_cert_chain("server.crt", "server.key")
self.ssl_context.set_ciphers("@SECLEVEL=1:ALL")
self.ssl_context.verify_mode = ssl.CERT_REQUIRED
self.ssl_context.load_verify_locations("client1.crt")
self.ssl_context.load_verify_locations("client2.crt")
self.ssl_context.load_verify_locations("client3.crt")
self.socket = self.ssl_context.wrap_socket(self.socket, server_side=True)
generate_certificate("localhost", "server.key", "server.crt")
generate_certificate("client1", "client1.key", "client1.crt")
generate_certificate("client2", "client2.key", "client2.crt")
generate_certificate("client3", "client3.key", "client3.crt")
# hack to add our self-signed cert to the default context; `verify=server.crt` will use a dedicated SSLContext
import certifi.core
certifi.core._CACERT_PATH = "server.crt"
import requests
stop_server = None
def run_server():
global stop_server
with HTTPSServer(('localhost', 8443), MyHTTPRequestHandler) as httpsd:
print("Server started on port 8443...")
stop_server = httpsd.shutdown
httpsd.serve_forever()
def run_client(name):
session = requests.Session()
session.cert = (f"{name}.crt", f"{name}.key")
time.sleep(0.5) # let the server start up
try:
resp = session.request("GET", f"https://127.0.0.1:8443/{name}")
expected = f"CN: {name} / URL: /{name}"
if resp.text != expected:
print(f"FAIL {name}: {resp.text}")
else:
print(f" OK {name}: {resp.text}")
except Exception as e:
print(f"FAIL {name}: {e}")
ex = ThreadPoolExecutor()
ex.submit(run_server)
client_futures = []
client_futures.append(ex.submit(run_client, "client1"))
client_futures.append(ex.submit(run_client, "client2"))
client_futures.append(ex.submit(run_client, "client3"))
wait(client_futures)
if stop_server:
stop_server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment