-
-
Save gbrova/2a178552306c1ead3dfff9c21c2df8e8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import ssl | |
import sys | |
import threading | |
import time | |
host = "127.0.0.1" | |
port = 56789 | |
def handle_connection(conn): | |
while True: | |
data = conn.recv(1024) | |
if not data: | |
# Empty response means disconnected (TCP messages have at least one byte) | |
break | |
data = data.decode("utf-8") | |
data = data.upper() | |
conn.sendall(data.encode("utf-8")) | |
def get_certfile_keyfile_for_server_name(server_name): | |
# For now we just have one certificate, but in principle we | |
# would select the right one here | |
print("Loading certificate for server:", server_name) | |
return "certificate.crt", "privateKey.key" | |
def my_sni_callback(sock, server_name, ssl_context): | |
certfile, keyfile = get_certfile_keyfile_for_server_name(server_name) | |
sock.context = make_server_ssl_context(certfile, keyfile) | |
def make_server_ssl_context(certfile, keyfile): | |
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) | |
context.load_cert_chain(certfile=certfile, keyfile=keyfile) | |
return context | |
def handle_tls_connection(conn): | |
default_certfile, default_keyfile = "certificate2.crt", "privateKey2.key" | |
context = make_server_ssl_context(default_certfile, default_keyfile) | |
context.sni_callback = my_sni_callback | |
with context.wrap_socket(conn, server_side=True) as s_conn: | |
handle_connection(s_conn) | |
def server_loop(): | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | |
sock.bind((host, port)) | |
sock.listen() | |
conn, addr = sock.accept() | |
handle_tls_connection(conn) | |
def run_client(): | |
context = ssl.create_default_context() | |
context.load_verify_locations("certificate.crt") | |
context.check_hostname = True | |
context.verify_mode = ssl.CERT_REQUIRED | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as sock: | |
with context.wrap_socket(sock, server_hostname="george.awesome.local") as ssock: | |
ssock.connect((host, port)) | |
ssock.sendall("convert to upper".encode("utf-8")) | |
received = ssock.recv(1024) | |
print("Response:", received.decode("utf-8")) | |
if __name__ == "__main__": | |
if len(sys.argv) > 1 and sys.argv[1] == "client": | |
print("Running as client") | |
run_client() | |
elif len(sys.argv) > 1 and sys.argv[1] == "server": | |
print("Running as server") | |
server_loop() | |
else: | |
# Just run a quick test and exit | |
worker = threading.Thread(None, target=server_loop, name=f"server") | |
worker.start() | |
time.sleep(0.3) # Wait for the server to be ready | |
run_client() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment