Created
February 6, 2018 00:12
-
-
Save jlaine/4c39326b251a43423b477d14ed67b21b to your computer and use it in GitHub Desktop.
DTLS-SRTP proof of concept in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import cffi | |
ffi = cffi.FFI() | |
ffi.cdef(""" | |
typedef struct bio_st BIO; | |
typedef struct bio_method_st BIO_METHOD; | |
typedef struct env_md_st EVP_MD; | |
typedef struct ssl_ctx_st SSL_CTX; | |
typedef struct ssl_method_st SSL_METHOD; | |
typedef struct ssl_st SSL; | |
typedef struct x509_st X509; | |
typedef struct x509_store_ctx_st X509_STORE_CTX; | |
BIO_METHOD *BIO_s_mem(void); | |
BIO *BIO_new(BIO_METHOD *type); | |
int BIO_free(BIO *a); | |
size_t BIO_ctrl_pending(BIO *b); | |
int BIO_read(BIO *b, void *data, int len); | |
int BIO_write(BIO *b, const void *data, int len); | |
const SSL_METHOD *DTLSv1_method(void); | |
SSL_CTX *SSL_CTX_new(const SSL_METHOD *meth); | |
void SSL_CTX_free(SSL_CTX *); | |
long SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg); | |
int SSL_CTX_set_cipher_list(SSL_CTX *, const char *str); | |
int SSL_CTX_set_tlsext_use_srtp(SSL_CTX *ctx, const char *profiles); | |
void SSL_CTX_set_verify(SSL_CTX *ctx, int mode, | |
int (*verify_callback)(int, X509_STORE_CTX *)); | |
int SSL_CTX_use_certificate_file(SSL_CTX *ctx, const char *file, int type); | |
int SSL_CTX_use_PrivateKey_file(SSL_CTX *ctx, const char *file, int type); | |
SSL *SSL_new(SSL_CTX *ctx); | |
void SSL_free(SSL *ssl); | |
int SSL_do_handshake(SSL *s); | |
int SSL_export_keying_material(SSL *s, unsigned char *out, size_t olen, | |
const char *label, size_t llen, | |
const unsigned char *context, | |
size_t contextlen, int use_context); | |
X509 *SSL_get_certificate(const SSL *ssl); | |
int SSL_get_error(const SSL *s, int ret_code); | |
X509 *SSL_get_peer_certificate(const SSL *ssl); | |
void SSL_set_accept_state(SSL *s); | |
void SSL_set_bio(SSL *s, BIO *rbio, BIO *wbio); | |
void SSL_set_connect_state(SSL *s); | |
const EVP_MD *EVP_get_digestbyname(const char *name); | |
int X509_digest(const X509 *data, const EVP_MD *type, | |
unsigned char *md, unsigned int *len); | |
""") | |
lib = ffi.dlopen('libssl.so.1.1') | |
EVP_MAX_MD_SIZE = 36 | |
SSL_CTRL_GET_READ_AHEAD = 40 | |
SSL_CTRL_SET_READ_AHEAD = 41 | |
SSL_ERROR_WANT_READ = 2 | |
SSL_ERROR_WANT_WRITE = 3 | |
SSL_FILETYPE_PEM = 1 | |
SSL_VERIFY_PEER = 1 | |
SSL_VERIFY_FAIL_IF_NO_PEER_CERT = 2 | |
SRTP_KEY_LEN = 16 | |
SRTP_SALT_LEN = 14 | |
def certificate_digest(x509): | |
digest = lib.EVP_get_digestbyname(b'SHA256') | |
if digest == ffi.NULL: | |
raise ValueError("No such digest method") | |
result_buffer = ffi.new("unsigned char[]", EVP_MAX_MD_SIZE) | |
result_length = ffi.new("unsigned int[]", 1) | |
result_length[0] = len(result_buffer) | |
digest_result = lib.X509_digest(x509, digest, result_buffer, result_length) | |
assert digest_result == 1 | |
return b":".join([ | |
base64.b16encode(ch).upper() for ch | |
in ffi.buffer(result_buffer, result_length[0])]).decode('ascii') | |
def get_srtp_key_salt(dest, src, idx): | |
# key | |
start = idx * SRTP_KEY_LEN | |
dest[0:SRTP_KEY_LEN] = src[start:start + SRTP_KEY_LEN] | |
# salt | |
start = 2 * SRTP_KEY_LEN + idx * SRTP_SALT_LEN | |
dest[SRTP_KEY_LEN:SRTP_KEY_LEN + SRTP_SALT_LEN] = src[start:start + SRTP_SALT_LEN] | |
@ffi.callback('int(int, X509_STORE_CTX *)') | |
def verify_callback(x, y): | |
return 1 | |
class DtlsSrtpContext: | |
def __init__(self): | |
self.ctx = lib.SSL_CTX_new(lib.DTLSv1_method()) | |
lib.SSL_CTX_set_verify(self.ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, | |
verify_callback) | |
if not lib.SSL_CTX_use_certificate_file(self.ctx, b'bogus-client.crt', SSL_FILETYPE_PEM): | |
print("SSL could not use certificate") | |
if not lib.SSL_CTX_use_PrivateKey_file(self.ctx, b'bogus-client.key', SSL_FILETYPE_PEM): | |
print("SSL could not use private key") | |
if not lib.SSL_CTX_set_cipher_list(self.ctx, b'HIGH:!CAMELLIA:!aNULL'): | |
print("SSL could not set cipher list") | |
if lib.SSL_CTX_set_tlsext_use_srtp(self.ctx, b'SRTP_AES128_CM_SHA1_80'): | |
print("SSL could not enable SRTP extension") | |
if lib.SSL_CTX_ctrl(self.ctx, SSL_CTRL_SET_READ_AHEAD, 1, ffi.NULL): | |
print("SSL could not enable read ahead") | |
def close(self): | |
lib.SSL_CTX_free(self.ctx) | |
class DtlsSrtpSession: | |
def __init__(self, context, is_server, transport): | |
self.encrypted = False | |
self.is_server = is_server | |
self.remote_fingerprint = None | |
self.ssl = lib.SSL_new(context.ctx) | |
self.srtp_tx_key = ffi.new('char[]', SRTP_KEY_LEN + SRTP_SALT_LEN) | |
self.srtp_rx_key = ffi.new('char[]', SRTP_KEY_LEN + SRTP_SALT_LEN) | |
self.transport = transport | |
self.read_bio = lib.BIO_new(lib.BIO_s_mem()) | |
self.write_bio = lib.BIO_new(lib.BIO_s_mem()) | |
lib.SSL_set_bio(self.ssl, self.read_bio, self.write_bio) | |
if self.is_server: | |
lib.SSL_set_accept_state(self.ssl) | |
else: | |
lib.SSL_set_connect_state(self.ssl) | |
@property | |
def local_fingerprint(self): | |
x509 = lib.SSL_get_certificate(self.ssl) | |
return certificate_digest(x509) | |
async def connect(self): | |
while not self.encrypted: | |
result = lib.SSL_do_handshake(self.ssl) | |
if result > 0: | |
self.encrypted = True | |
break | |
error = lib.SSL_get_error(self.ssl, result) | |
if error == SSL_ERROR_WANT_READ: | |
data = await self.transport.recv() | |
lib.BIO_write(self.read_bio, data, len(data)) | |
elif error != SSL_ERROR_WANT_WRITE: | |
raise Exception('DTLS handshake failed (error %d)' % error) | |
pending = lib.BIO_ctrl_pending(self.write_bio) | |
if pending > 0: | |
buf = ffi.new("char[]", pending) | |
lib.BIO_read(self.write_bio, buf, len(buf)) | |
data = b''.join(buf) | |
await self.transport.send(data) | |
# check remote fingerprint | |
x509 = lib.SSL_get_peer_certificate(self.ssl) | |
remote_fingerprint = certificate_digest(x509) | |
if remote_fingerprint != self.remote_fingerprint.upper(): | |
raise Exception('DTLS fingerprint does not match') | |
# generate keying material | |
buf = ffi.new("char[]", 2 * (SRTP_KEY_LEN + SRTP_SALT_LEN)) | |
extractor = b'EXTRACTOR-dtls_srtp' | |
if not lib.SSL_export_keying_material(self.ssl, buf, len(buf), | |
extractor, len(extractor), | |
ffi.NULL, 0, 0): | |
raise Exception('DTLS could not extract SRTP keying material') | |
if self.is_server: | |
get_srtp_key_salt(self.srtp_tx_key, buf, 1) | |
get_srtp_key_salt(self.srtp_rx_key, buf, 0) | |
else: | |
get_srtp_key_salt(self.srtp_tx_key, buf, 0) | |
get_srtp_key_salt(self.srtp_rx_key, buf, 1) | |
print('DTLS handshake complete') | |
def close(self): | |
lib.SSL_free(self.ssl) | |
if __name__ == '__main__': | |
context = DtlsSrtpContext() | |
session = DtlsSrtpSession(context, is_server=True, transport=None) | |
print(session.local_fingerprint) | |
session.close() | |
context.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment