Skip to content

Instantly share code, notes, and snippets.

@jlaine
Created February 6, 2018 00:12
Show Gist options
  • Save jlaine/4c39326b251a43423b477d14ed67b21b to your computer and use it in GitHub Desktop.
Save jlaine/4c39326b251a43423b477d14ed67b21b to your computer and use it in GitHub Desktop.
DTLS-SRTP proof of concept in python
import base64
import cffi
ffi = cffi.FFI()
ffi.cdef("""
typedef struct bio_st BIO;
typedef struct bio_method_st BIO_METHOD;
typedef struct env_md_st EVP_MD;
typedef struct ssl_ctx_st SSL_CTX;
typedef struct ssl_method_st SSL_METHOD;
typedef struct ssl_st SSL;
typedef struct x509_st X509;
typedef struct x509_store_ctx_st X509_STORE_CTX;
BIO_METHOD *BIO_s_mem(void);
BIO *BIO_new(BIO_METHOD *type);
int BIO_free(BIO *a);
size_t BIO_ctrl_pending(BIO *b);
int BIO_read(BIO *b, void *data, int len);
int BIO_write(BIO *b, const void *data, int len);
const SSL_METHOD *DTLSv1_method(void);
SSL_CTX *SSL_CTX_new(const SSL_METHOD *meth);
void SSL_CTX_free(SSL_CTX *);
long SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg);
int SSL_CTX_set_cipher_list(SSL_CTX *, const char *str);
int SSL_CTX_set_tlsext_use_srtp(SSL_CTX *ctx, const char *profiles);
void SSL_CTX_set_verify(SSL_CTX *ctx, int mode,
int (*verify_callback)(int, X509_STORE_CTX *));
int SSL_CTX_use_certificate_file(SSL_CTX *ctx, const char *file, int type);
int SSL_CTX_use_PrivateKey_file(SSL_CTX *ctx, const char *file, int type);
SSL *SSL_new(SSL_CTX *ctx);
void SSL_free(SSL *ssl);
int SSL_do_handshake(SSL *s);
int SSL_export_keying_material(SSL *s, unsigned char *out, size_t olen,
const char *label, size_t llen,
const unsigned char *context,
size_t contextlen, int use_context);
X509 *SSL_get_certificate(const SSL *ssl);
int SSL_get_error(const SSL *s, int ret_code);
X509 *SSL_get_peer_certificate(const SSL *ssl);
void SSL_set_accept_state(SSL *s);
void SSL_set_bio(SSL *s, BIO *rbio, BIO *wbio);
void SSL_set_connect_state(SSL *s);
const EVP_MD *EVP_get_digestbyname(const char *name);
int X509_digest(const X509 *data, const EVP_MD *type,
unsigned char *md, unsigned int *len);
""")
lib = ffi.dlopen('libssl.so.1.1')
EVP_MAX_MD_SIZE = 36
SSL_CTRL_GET_READ_AHEAD = 40
SSL_CTRL_SET_READ_AHEAD = 41
SSL_ERROR_WANT_READ = 2
SSL_ERROR_WANT_WRITE = 3
SSL_FILETYPE_PEM = 1
SSL_VERIFY_PEER = 1
SSL_VERIFY_FAIL_IF_NO_PEER_CERT = 2
SRTP_KEY_LEN = 16
SRTP_SALT_LEN = 14
def certificate_digest(x509):
digest = lib.EVP_get_digestbyname(b'SHA256')
if digest == ffi.NULL:
raise ValueError("No such digest method")
result_buffer = ffi.new("unsigned char[]", EVP_MAX_MD_SIZE)
result_length = ffi.new("unsigned int[]", 1)
result_length[0] = len(result_buffer)
digest_result = lib.X509_digest(x509, digest, result_buffer, result_length)
assert digest_result == 1
return b":".join([
base64.b16encode(ch).upper() for ch
in ffi.buffer(result_buffer, result_length[0])]).decode('ascii')
def get_srtp_key_salt(dest, src, idx):
# key
start = idx * SRTP_KEY_LEN
dest[0:SRTP_KEY_LEN] = src[start:start + SRTP_KEY_LEN]
# salt
start = 2 * SRTP_KEY_LEN + idx * SRTP_SALT_LEN
dest[SRTP_KEY_LEN:SRTP_KEY_LEN + SRTP_SALT_LEN] = src[start:start + SRTP_SALT_LEN]
@ffi.callback('int(int, X509_STORE_CTX *)')
def verify_callback(x, y):
return 1
class DtlsSrtpContext:
def __init__(self):
self.ctx = lib.SSL_CTX_new(lib.DTLSv1_method())
lib.SSL_CTX_set_verify(self.ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
verify_callback)
if not lib.SSL_CTX_use_certificate_file(self.ctx, b'bogus-client.crt', SSL_FILETYPE_PEM):
print("SSL could not use certificate")
if not lib.SSL_CTX_use_PrivateKey_file(self.ctx, b'bogus-client.key', SSL_FILETYPE_PEM):
print("SSL could not use private key")
if not lib.SSL_CTX_set_cipher_list(self.ctx, b'HIGH:!CAMELLIA:!aNULL'):
print("SSL could not set cipher list")
if lib.SSL_CTX_set_tlsext_use_srtp(self.ctx, b'SRTP_AES128_CM_SHA1_80'):
print("SSL could not enable SRTP extension")
if lib.SSL_CTX_ctrl(self.ctx, SSL_CTRL_SET_READ_AHEAD, 1, ffi.NULL):
print("SSL could not enable read ahead")
def close(self):
lib.SSL_CTX_free(self.ctx)
class DtlsSrtpSession:
def __init__(self, context, is_server, transport):
self.encrypted = False
self.is_server = is_server
self.remote_fingerprint = None
self.ssl = lib.SSL_new(context.ctx)
self.srtp_tx_key = ffi.new('char[]', SRTP_KEY_LEN + SRTP_SALT_LEN)
self.srtp_rx_key = ffi.new('char[]', SRTP_KEY_LEN + SRTP_SALT_LEN)
self.transport = transport
self.read_bio = lib.BIO_new(lib.BIO_s_mem())
self.write_bio = lib.BIO_new(lib.BIO_s_mem())
lib.SSL_set_bio(self.ssl, self.read_bio, self.write_bio)
if self.is_server:
lib.SSL_set_accept_state(self.ssl)
else:
lib.SSL_set_connect_state(self.ssl)
@property
def local_fingerprint(self):
x509 = lib.SSL_get_certificate(self.ssl)
return certificate_digest(x509)
async def connect(self):
while not self.encrypted:
result = lib.SSL_do_handshake(self.ssl)
if result > 0:
self.encrypted = True
break
error = lib.SSL_get_error(self.ssl, result)
if error == SSL_ERROR_WANT_READ:
data = await self.transport.recv()
lib.BIO_write(self.read_bio, data, len(data))
elif error != SSL_ERROR_WANT_WRITE:
raise Exception('DTLS handshake failed (error %d)' % error)
pending = lib.BIO_ctrl_pending(self.write_bio)
if pending > 0:
buf = ffi.new("char[]", pending)
lib.BIO_read(self.write_bio, buf, len(buf))
data = b''.join(buf)
await self.transport.send(data)
# check remote fingerprint
x509 = lib.SSL_get_peer_certificate(self.ssl)
remote_fingerprint = certificate_digest(x509)
if remote_fingerprint != self.remote_fingerprint.upper():
raise Exception('DTLS fingerprint does not match')
# generate keying material
buf = ffi.new("char[]", 2 * (SRTP_KEY_LEN + SRTP_SALT_LEN))
extractor = b'EXTRACTOR-dtls_srtp'
if not lib.SSL_export_keying_material(self.ssl, buf, len(buf),
extractor, len(extractor),
ffi.NULL, 0, 0):
raise Exception('DTLS could not extract SRTP keying material')
if self.is_server:
get_srtp_key_salt(self.srtp_tx_key, buf, 1)
get_srtp_key_salt(self.srtp_rx_key, buf, 0)
else:
get_srtp_key_salt(self.srtp_tx_key, buf, 0)
get_srtp_key_salt(self.srtp_rx_key, buf, 1)
print('DTLS handshake complete')
def close(self):
lib.SSL_free(self.ssl)
if __name__ == '__main__':
context = DtlsSrtpContext()
session = DtlsSrtpSession(context, is_server=True, transport=None)
print(session.local_fingerprint)
session.close()
context.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment