Skip to content

Instantly share code, notes, and snippets.

@saghul
Created January 22, 2013 23:24
Show Gist options
  • Save saghul/4599801 to your computer and use it in GitHub Desktop.
Save saghul/4599801 to your computer and use it in GitHub Desktop.
pyuv + OpenSSL example
-----BEGIN CERTIFICATE-----
MIIBrzCCARgCAQAwDQYJKoZIhvcNAQEEBQAwIDEeMBwGA1UEAxMVQ2VydGlmaWNh
dGUgQXV0aG9yaXR5MB4XDTEzMDEyMjAwNTkzMVoXDTE4MDEyMTAwNTkzMVowIDEe
MBwGA1UEAxMVQ2VydGlmaWNhdGUgQXV0aG9yaXR5MIGfMA0GCSqGSIb3DQEBAQUA
A4GNADCBiQKBgQDK3qyBBmFrIE/1+sRndzKZYz1vjeA7uDGpyRIeKufl6MyKBPLu
9Irhmy9kveRcpaSouyXgMnYtFqmMdNqxmBukF15o0HXcEAfHavtZ4N62CxQgUy25
nW0pBB3Rohxjz7ugpYOr8sOu7zrc3VpTN733LlOh/RPNTbKFWBoy0XW/vQIDAQAB
MA0GCSqGSIb3DQEBBAUAA4GBAH1ibeupY9p+KRKxEa9IYg4UUndxlnpr/xnxuy4o
MJmSfLdvXZHsnV+93I/fbZIZHDJgd/VXBDUXF3wqdO9JHwk4g9VOO7LMIFZOqnpi
4ua8ctg+GCQiEdHUEQ/grWPgS7dW+FTDUXS8S44hskVpTmZOJFuJhTegsLYp0nfq
X+FD
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIBpzCCARACAQEwDQYJKoZIhvcNAQEEBQAwIDEeMBwGA1UEAxMVQ2VydGlmaWNh
dGUgQXV0aG9yaXR5MB4XDTEzMDEyMjAwNTkzMVoXDTE4MDEyMTAwNTkzMVowGDEW
MBQGA1UEAxMNU2ltcGxlIENsaWVudDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkC
gYEAwci98l6/QjbU+9YB8NQSn0FEmj+C7slcEuSOFrb4zOGHHneZq8gH7VlwuQvj
MzQHb+0v+wM3YObii3O0YXR6PAbqAvzdmpYgPlUqXJNiEBlvKT5BOZHr9jriy+EP
9yD6X+dv6nPSf6jtuFwgO+fs6ztRisf3VnyIns5ib10AvmECAwEAATANBgkqhkiG
9w0BAQQFAAOBgQA2S8HK9lSvdOSkyZ9zSHCi7ZNzb6ieT17YJwiWr4WCqJGmLm6t
tdEQwbb+NRhr5WwMtkQoYJg8bv1vuF6BEMAVgZXvavVWaRUJmodDOwYmPAeRMLqF
cG193wpchToWIvyPedB1Xq/vJI6LUVOvGAfUPmW2r++2bncBKAw493eBqw==
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQDByL3yXr9CNtT71gHw1BKfQUSaP4LuyVwS5I4WtvjM4Yced5mr
yAftWXC5C+MzNAdv7S/7Azdg5uKLc7RhdHo8BuoC/N2aliA+VSpck2IQGW8pPkE5
kev2OuLL4Q/3IPpf52/qc9J/qO24XCA75+zrO1GKx/dWfIiezmJvXQC+YQIDAQAB
AoGAYEpUgDuuQ8OlP2IO4tEuU64F3bOTZv3tX4HsTMMsi/nAv1XkqSQjNEBOL9UF
V2sSCv7L6ammeeMgTPT4e7h6B71QSiRiS98lQn2vSJx1og1lncMDrIx5NjzduJtK
pJEpWlfEF6RbqEqbJEcvreB4DXVQ6a3Uws8154jHosDXVKUCQQD2w/MJ2/EyDLqZ
J+Wko7b+5YE5dEuSMsT09xP+PAZs5KEIkfwAhgyHEW2psxONXB8+PFytFJlAtctn
aYCvZ2brAkEAyQk5TT3kZseJ+AIRtrsGz/gOyoek/aRe1AMmGZhMZeXn/dgwhzFm
7/5W7SpmWpj9IEY/761VQPmRWttgvYp04wJAGdhMDCxNBsDuijvzgVrkP64p6qqT
f6xxlHaMUYRX5+/KLeucSTHA/iSFJ9Dpq1SKsSoBSt9tbamctCgIolZiIQJATIQm
OzADbtsjuDGRbGti/GT9vDhEpAWb0jYgmj1NVrtawVM3pT04YL/9deddbb4tGcuj
KiZe/IwAtwQonfvE4QJAKACxWfxZ/iM69ZvDLrbemF/F2t2h/5AV/t3ScjTmdkmG
RBy6KVyMp0W/cVlmmSLgbKgIqZzKn9QdJYSedQ3Z7A==
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIIBpzCCARACAQEwDQYJKoZIhvcNAQEEBQAwIDEeMBwGA1UEAxMVQ2VydGlmaWNh
dGUgQXV0aG9yaXR5MB4XDTEzMDEyMjAwNTkzMVoXDTE4MDEyMTAwNTkzMVowGDEW
MBQGA1UEAxMNU2ltcGxlIFNlcnZlcjCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkC
gYEAodkur1u0xYaJBVsxQSUxHOASfzOJpA69k5XLkyD6kF729hMQ2jwf7Lx47YGb
8qloE9xGbQB8KxXX5BL5K/VxuDafDb51gCNBeuf7B3GMSduU3GJal0wiePlOWIgx
kUkVZnuyeOYMSHbmV5udxXVXcaEGDk1UZ6LEQq0us5m6zCcCAwEAATANBgkqhkiG
9w0BAQQFAAOBgQAc83cXcCa9srzRyG3wGCFx8xNwQZl4d8AUDzpTrwufMWIqoRfU
DIXYVfNqsBqfMSeOAjoX4HUtud4bHmPT2gi1yJEood4J2DeOMYvRYCQYS7hyaluI
EyqZhoXClZtu2T1SdIAltxVBPqSCQQIjTD+KUEafeiQl6Cl722p6GIOGHQ==
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIICXgIBAAKBgQCh2S6vW7TFhokFWzFBJTEc4BJ/M4mkDr2TlcuTIPqQXvb2ExDa
PB/svHjtgZvyqWgT3EZtAHwrFdfkEvkr9XG4Np8NvnWAI0F65/sHcYxJ25TcYlqX
TCJ4+U5YiDGRSRVme7J45gxIduZXm53FdVdxoQYOTVRnosRCrS6zmbrMJwIDAQAB
AoGATasFhk2B8JBhTNq4RkTszqiQ983prXsNarel29MlqwaHiQsZOUFFKLxBY+ig
x9CYC3/XpBNpgtuWoPKh9IBys2rSx/4tE7lGm9UMzZ1uFBnNN9yT9hwRaHK8+xmA
kjrftH5z8xj6Dtu6tovVMiwgtRQP46TtG8elCrcGHYpTDhECQQDNwKi+45yBx9TD
DM/AACL7bv9bVfZ2rRS8Wuq4K8LWMp3ISkevFY6S58CVn+2gZOlL2cJ+C1QNkNiY
BOqrMUFlAkEAyV+yyq8xk1ueuTOtlwvjFmGpcDg6yq9sxHwl6sG+JBNqu4K/jszb
FjzK98Pe4vDcuJHV7+/Gp5L1KRlnHn4kmwJBAJPqvKW3Jo3apqeu7y/+KSgPbT8x
dqVs2upqhjHvK/wnmW0jkZNacQxF1hr7Ra84vMvN+lf5Nu0lw8DOUBLQr00CQQCe
T2+9zBFLaaHUs33q21uBwvFz2aDOqy71ISyl6/5RWjp0g4uY9g/e4ZgnRIM7ImRD
bdMkt/oSz4OQ9fmNjVm1AkEArB0bO/Yq/g7BBrVYkaCEDN5wiOW27j59hXX8MqKM
rgxMpUNctbuXSUdy6SK/XsicTgncGzNNZ/ztFsEUHZLdcw==
-----END RSA PRIVATE KEY-----
from __future__ import print_function
import OpenSSL
import pyuv
import uvtls
import signal
import sys
def shutdown_cb(handle, error):
tls_h.close()
def read_cb(handle, data, error):
if error is not None:
print("Read error: {}".format(pyuv.errno.strerror(error)))
tls_h.close()
return
tls_h.write(data)
print("Received data: {}".format(data))
if data.strip() == b'exit':
tls_h.shutdown()
def connect_cb(handle, error):
if error is not None:
print("Connection error: {}".format(pyuv.errno.strerror(error)))
return
print("Connected to {}".format(tls_h.getpeername()))
tls_h.start_read(read_cb)
def signal_cb(handle, signum):
signal_h.close()
tls_h.close()
loop = pyuv.Loop.default_loop()
ca = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open('ca.pem', 'r').read())
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open('client.cert', 'r').read())
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, open('client.pkey', 'r').read())
tls_h = uvtls.TLS(loop, cert=cert, key=key, ca_list=[ca])
tls_h.connect((sys.argv[1], int(sys.argv[2])), connect_cb)
signal_h = pyuv.Signal(loop)
signal_h.unref()
signal_h.start(signal_cb, signal.SIGINT)
loop.run()
from __future__ import print_function
import OpenSSL
import pyuv
import uvtls
import signal
import sys
def shutdown_cb(handle, error):
if not handle.closed:
# Handle may already be closed if the remote didn't shutdown TLS
# properly and cut the TCP connection instead
handle.close()
connections.remove(handle)
def read_cb(handle, data, error):
if error is not None:
print("Read error: {}".format(pyuv.errno.strerror(error)))
handle.close()
return
handle.write(data)
print("Received data: {}".format(data))
if data.strip() == b'exit':
handle.shutdown(shutdown_cb)
def connection_cb(handle, error):
if error is not None:
print("Connection error: {}".format(pyuv.errno.strerror(error)))
return
tls_h = uvtls.TLS(loop)
server.accept(tls_h)
print("New connection from {}".format(tls_h.getpeername()))
tls_h.start_read(read_cb)
connections.append(tls_h)
def signal_cb(handle, signum):
signal_h.close()
[c.close() for c in connections]
server.close()
connections = []
loop = pyuv.Loop.default_loop()
ca = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open('CA.cert', 'r').read())
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open('server.cert', 'r').read())
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, open('server.pkey', 'r').read())
server = uvtls.TLS(loop, cert=cert, key=key, ca_list=[ca])
server.bind(('127.0.0.1', int(sys.argv[1])))
server.listen(connection_cb)
print("Listening on {}".format(server.getsockname()))
signal_h = pyuv.Signal(loop)
signal_h.unref()
signal_h.start(signal_cb, signal.SIGINT)
loop.run()
from __future__ import print_function
import functools
import OpenSSL
import pyuv
from OpenSSL.SSL import WantReadError, ZeroReturnError, Error, VERIFY_NONE
_BIO_READ_SIZE = 2**15
def _true(*args):
return True
class TLS(object):
def __init__(self, loop, key=None, cert=None, verify_mode=VERIFY_NONE, verify_callback=_true, version=OpenSSL.SSL.TLSv1_METHOD, ca_list=None):
self.loop = loop
self._tcp = pyuv.TCP(loop)
ctx = OpenSSL.SSL.Context(version)
if key is not None:
ctx.use_privatekey(key)
if cert is not None:
ctx.use_certificate(cert)
if key is not None and cert is not None:
ctx.check_privatekey()
if ca_list is not None:
store = ctx.get_cert_store()
for ca in ca_list:
store.add_cert(ca)
if not callable(verify_callback):
raise ValueError('verify_calback needs to be callable')
ctx.set_verify(verify_mode, verify_callback)
self._connection = OpenSSL.SSL.Connection(ctx, None)
self._lost_tls_connection = False
self._handshake_done = False
self._write_blocked_on_read = False
self._send_buffer = []
self._saved_shutdown_cb = None
def bind(self, address):
self._tcp.bind(address)
def listen(self, callback, backlog=128):
self._tcp.listen(callback, backlog)
def accept(self, client):
self._connection.set_accept_state()
self._tcp.accept(client._tcp)
client._connection = OpenSSL.SSL.Connection(self._connection.get_context(), None)
client._connection.set_accept_state()
client._do_handshake()
def connect(self, address, callback):
self._connection.set_connect_state()
self._tcp.connect(address, functools.partial(self._on_connect_cb, callback))
def shutdown(self, callback=None):
assert self._saved_shutdown_cb is None, "shutdown was already called"
self._saved_shutdown_cb = callback
self._shutdown_tls()
def write(self, data, callback=None):
assert callback is None, "callback is not supported"
if self._lost_tls_connection:
return
to_send = data
while to_send:
try:
sent = self._connection.send(to_send)
except WantReadError:
self._write_blocked_on_read = True
self._send_buffer.append(to_send)
break
except Error:
# Pretend TLS connection disconnected, which will trigger
# disconnect of underlying transport. The error will be passed
# to the application protocol's connectionLost method. The
# other SSL implementation doesn't, but losing helpful
# debugging information is a bad idea.
self._tcp.close()
break
else:
# If we sent some bytes, the handshake must be done. Keep
# track of this to control error reporting behavior.
self._handshake_done = True
self._flush_send_bio()
to_send = to_send[sent:]
def writelines(self, seq, callback=None):
raise NotImplementedError
def start_read(self, callback):
self._tcp.start_read(functools.partial(self._on_read_cb, callback))
def stop_read(self):
self._tcp.stop_read()
def close(self, callback=None):
self._tcp.close(callback)
def ref(self):
self._tcp.ref()
def unref(self):
self._tcp.unref()
def getsockname(self):
return self._tcp.getsockname()
def getpeername(self):
return self._tcp.getpeername()
def nodelay(self, enabled):
self._tcp.nodelay(enabled)
def keepalive(self, enabled, delay):
self._tcp.keepalive(enabled, delay)
def simultaneous_accepts(self, enabled):
self._tcp.simultaneous_accepts(enabled)
@property
def readable(self):
return self._tcp.readable
@property
def writable(self):
return self._tcp.writable
@property
def active(self):
return self._tcp.active
@property
def closed(self):
return self._tcp.closed
# Private
def _do_handshake(self):
try:
self._connection.do_handshake()
except WantReadError:
# There is no data to complete the handshake, but there may be some
# handshake bytes to be sent
self._flush_send_bio()
def _on_connect_cb(self, user_callback, handle, error):
if error is not None:
user_callback(handle, error)
return
# Start the SSL handshake
self._do_handshake()
# Call the user supplied callback
user_callback(handle, error)
def _on_read_cb(self, user_callback, handle, data, error):
if error is not None:
user_callback(self, data, error)
return
self._connection.bio_write(data)
if self._write_blocked_on_read:
self._write_blocked_on_read = False
buf, self._send_buffer = self._send_buffer, []
for item in buf:
self.write(item)
self._flush_receive_bio(user_callback)
def _shutdown_tls(self):
success = self._connection.shutdown()
self._flush_send_bio()
if success:
self._tcp.shutdown(self._shutdown_cb)
def _shutdown_cb(self, handle, error):
if self._saved_shutdown_cb is not None:
self._saved_shutdown_cb(self, error)
def _flush_send_bio(self, callback=None):
try:
data = self._connection.bio_read(_BIO_READ_SIZE)
except WantReadError:
pass
else:
self._tcp.write(data, callback)
def _flush_receive_bio(self, callback):
# Keep trying this until an error indicates we should stop or we
# close the connection. Looping is necessary to make sure we
# process all of the data which was put into the receive BIO, as
# there is no guarantee that a single recv call will do it all.
while not self._lost_tls_connection:
try:
data = self._connection.recv(_BIO_READ_SIZE)
except WantReadError:
# The newly received bytes might not have been enough to produce
# any application data.
break
except ZeroReturnError:
# TLS has shut down and no more TLS data will be received over
# this connection.
self._shutdown_tls()
self._lost_tls_connection = True
except Error as e:
# Something went pretty wrong. For example, this might be a
# handshake failure (because there were no shared ciphers, because
# a certificate failed to verify, etc). TLS can no longer proceed.
# TODO: what to do with the exception?
print("OpenSSL error: {}".format(e))
self._flush_send_bio()
self._lost_tls_connection = True
self._tcp.close()
else:
# If we got application bytes, the handshake must be done by
# now. Keep track of this to control error reporting later.
self._handshake_done = True
callback(self, data, None)
# The received bytes might have generated a response which needs to be
# sent now. For example, the handshake involves several round-trip
# exchanges without ever producing application-bytes.
self._flush_send_bio()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment