Skip to content

Instantly share code, notes, and snippets.

@gbrova
Created October 4, 2020 16:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gbrova/2a178552306c1ead3dfff9c21c2df8e8 to your computer and use it in GitHub Desktop.
Save gbrova/2a178552306c1ead3dfff9c21c2df8e8 to your computer and use it in GitHub Desktop.
import socket
import ssl
import sys
import threading
import time
host = "127.0.0.1"
port = 56789
def handle_connection(conn):
while True:
data = conn.recv(1024)
if not data:
# Empty response means disconnected (TCP messages have at least one byte)
break
data = data.decode("utf-8")
data = data.upper()
conn.sendall(data.encode("utf-8"))
def get_certfile_keyfile_for_server_name(server_name):
# For now we just have one certificate, but in principle we
# would select the right one here
print("Loading certificate for server:", server_name)
return "certificate.crt", "privateKey.key"
def my_sni_callback(sock, server_name, ssl_context):
certfile, keyfile = get_certfile_keyfile_for_server_name(server_name)
sock.context = make_server_ssl_context(certfile, keyfile)
def make_server_ssl_context(certfile, keyfile):
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile=certfile, keyfile=keyfile)
return context
def handle_tls_connection(conn):
default_certfile, default_keyfile = "certificate2.crt", "privateKey2.key"
context = make_server_ssl_context(default_certfile, default_keyfile)
context.sni_callback = my_sni_callback
with context.wrap_socket(conn, server_side=True) as s_conn:
handle_connection(s_conn)
def server_loop():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind((host, port))
sock.listen()
conn, addr = sock.accept()
handle_tls_connection(conn)
def run_client():
context = ssl.create_default_context()
context.load_verify_locations("certificate.crt")
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as sock:
with context.wrap_socket(sock, server_hostname="george.awesome.local") as ssock:
ssock.connect((host, port))
ssock.sendall("convert to upper".encode("utf-8"))
received = ssock.recv(1024)
print("Response:", received.decode("utf-8"))
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "client":
print("Running as client")
run_client()
elif len(sys.argv) > 1 and sys.argv[1] == "server":
print("Running as server")
server_loop()
else:
# Just run a quick test and exit
worker = threading.Thread(None, target=server_loop, name=f"server")
worker.start()
time.sleep(0.3) # Wait for the server to be ready
run_client()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment