Created
November 5, 2018 09:56
-
-
Save mosajjal/c088d03225287115a2e1fffef82ed25b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python | |
import argparse | |
import base64 | |
import hashlib | |
import os | |
import struct | |
import sys | |
from binascii import hexlify, unhexlify | |
import dns.resolver | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives.asymmetric.x25519 import (X25519PrivateKey, | |
X25519PublicKey) | |
from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
from cryptography.hazmat.primitives.kdf.hkdf import HKDF, HKDFExpand | |
import tldextract | |
def create_esni(esni_server_name, key_shares, clientrandom): | |
''' | |
generates encrypted_server_name extension bytes from DNS response. | |
key_shares is a part of a TLS1.x extension and client_random is a value from the ClientHello packet | |
''' | |
# Extract Domain TLD for DNS query | |
parsed_url = tldextract.extract(esni_server_name) | |
domain = f"{parsed_url.domain}.{parsed_url.suffix}" | |
# Perform DNS request | |
try: | |
responses = dns.resolver.query(f"_esni.{domain}", "TXT") | |
except dns.resolver.NXDOMAIN: | |
print(f"No ESNI record found for domain: {domain}") | |
#Exit if no ESNI record is found | |
sys.exit(0) | |
# B64decode the first response (for now) | |
txt_response = responses[0].strings[0] | |
esni_raw_response = bytearray(base64.b64decode(txt_response)) | |
# Create a copy for later use and computing SHA256 digest | |
esni_key_copy = esni_raw_response | |
esni_keys = {} | |
# Create parsed struct | |
# struct { | |
# uint16 version; | |
# uint8 checksum[4]; | |
# KeyShareEntry keys<4..2^16-1>; | |
# CipherSuite cipher_suites<2..2^16-2>; | |
# uint16 padded_length; | |
# uint64 not_before; | |
# uint64 not_after; | |
# Extension extensions<0..2^16-1>; | |
# } ESNIKeys; | |
esni_keys['version'], esni_raw_response = esni_raw_response[:2], esni_raw_response[2:] | |
esni_keys['checksum'], esni_raw_response = esni_raw_response[:4], esni_raw_response[4:] | |
esni_keys['key_shares'], esni_raw_response = esni_raw_response[:2], esni_raw_response[2:] | |
key_len = struct.unpack("!H", esni_keys['key_shares'])[0] | |
esni_keys['key_share_entry'], esni_raw_response = esni_raw_response[:key_len], esni_raw_response[key_len:] | |
esni_keys['cipher_len'], esni_raw_response = esni_raw_response[:2], esni_raw_response[2:] | |
cipher_len = struct.unpack("!H", esni_keys['cipher_len'])[0] | |
esni_keys['cipher'], esni_raw_response = esni_raw_response[:cipher_len], esni_raw_response[cipher_len:] | |
esni_keys['padded_length'], esni_raw_response = esni_raw_response[:2], esni_raw_response[2:] | |
esni_keys['not_before'], esni_raw_response = esni_raw_response[:8], esni_raw_response[8:] | |
esni_keys['not_after'], esni_raw_response = esni_raw_response[:8], esni_raw_response[8:] | |
esni_keys['extension'], esni_raw_response = esni_raw_response[:2], esni_raw_response[2:] | |
# Change checksum value to 0x00 as RFC and verify the checksum | |
esni_key_copy_for_hash = esni_key_copy | |
esni_key_copy_for_hash[2:6] = b'\x00' * 4 | |
if hashlib.sha256(esni_key_copy_for_hash).digest()[0:4] != esni_keys['checksum']: | |
print("hash not passed") | |
sys.exit(0) | |
# create keyShareEntry of the first key | |
# struct { | |
# NamedGroup group; | |
# opaque key_exchange<1..2^16-1>; | |
# } KeyShareEntry; | |
recieved_key_share_entry = {} | |
recieved_key_share_entry['group'] = esni_keys['key_share_entry'][:2] | |
recieved_key_share_entry['length'] = esni_keys['key_share_entry'][2:4] | |
recieved_key_share_entry['key'] = esni_keys['key_share_entry'][4:4+struct.unpack("!H", recieved_key_share_entry['length'])[0]] | |
# Create keyshare for key exchange | |
sender_private_key = X25519PrivateKey.generate() | |
sender_public_key = sender_private_key.public_key() | |
reciever_public_key = X25519PublicKey.from_public_bytes(bytes(recieved_key_share_entry['key'])) | |
shared_key = sender_private_key.exchange(reciever_public_key) | |
key_share = recieved_key_share_entry['group'] + struct.pack("!H", len(sender_public_key.public_bytes())) + sender_public_key.public_bytes() | |
# Generate SHA256 digest of the decoded TXT DNS response | |
record_digest = struct.pack("!H", 32) + hashlib.sha256(esni_key_copy).digest() | |
# Generate buffer for ESNIContents structure | |
# struct { | |
# opaque record_digest<0..2^16-1>; #LEN:0x22 | |
# KeyShareEntry esni_key_share; #LEN:0x24 | |
# Random client_hello_random; #LEN: 0x20 | |
# } ESNIContents; | |
esni_contents = record_digest + key_share + clientrandom | |
# Generate labels for ESNI key and ESNI IV as part of AES-GCM | |
# struct { | |
# uint16 length = Length; | |
# opaque label<7..255> = "tls13 " + Label; | |
# opaque hash_value<0..255> = HashValue; | |
# } HkdfLabel; | |
key_label = b'tls13 esni key' | |
iv_label = b'tls13 esni iv' | |
hkdf_label_key = key_label + b" " + hashlib.sha256(esni_contents).digest() | |
hkdf_label_key = struct.pack("!H", 16) + struct.pack("!B", len(key_label)) + hkdf_label_key | |
hkdf_label_iv = iv_label + b" " + hashlib.sha256(esni_contents).digest() | |
hkdf_label_iv = struct.pack("!H", 12) + struct.pack("!B", len(iv_label)) + hkdf_label_iv | |
# Generate KEY and IV | |
derived_key = HKDFExpand(algorithm=hashes.SHA256(),length=16, info=hkdf_label_key, backend=default_backend()).derive(shared_key) | |
derived_iv = HKDFExpand(algorithm=hashes.SHA256(), length=12, info=hkdf_label_iv, backend=default_backend()).derive(shared_key) | |
# Create ClientHello.KeyShareClientHello | |
# ClientHello.KeyShareClientHello is the body of the extension but not including the extension header. | |
key_share_entry = struct.pack('!H', len(key_shares)) + key_shares | |
# Re-Create sni to be encrypted for ESNIContents and create these two structs | |
# struct { | |
# ServerNameList sni; | |
# opaque zeros[ESNIKeys.padded_length - length(sni)]; | |
# } PaddedServerNameList; | |
# struct { | |
# uint8 nonce[16]; | |
# PaddedServerNameList realSNI; | |
# } ClientESNIInner; | |
sni = b'\x00' + struct.pack("!H", len(esni_server_name)) + bytes(esni_server_name, "ASCII") | |
sni = struct.pack("!H", len(sni)) + sni | |
padded_server_name_list = bytes(sni) | |
padded_server_name_list += b'\x00'*(struct.unpack("!H", esni_keys['padded_length'])[0] - len(padded_server_name_list)) | |
client_esni_inner = os.urandom(16) + padded_server_name_list | |
# Use AES-GCM for now to encrypt and authenticate provided data using AEAD | |
aesgcm = AESGCM(derived_key) | |
encrypted_sni = aesgcm.encrypt(derived_iv, client_esni_inner, key_share_entry) | |
encrypted_sni = struct.pack("!H", len(encrypted_sni)) + encrypted_sni | |
# Create final data to be sent via the wire (minus the length) | |
# struct { | |
# CipherSuite suite; | |
# KeyShareEntry key_share; | |
# opaque record_digest<0..2^16-1>; | |
# opaque encrypted_sni<0..2^16-1>; | |
# } ClientEncryptedSNI; | |
client_encrypted_sni = b'' | |
client_encrypted_sni += esni_keys['cipher'] | |
client_encrypted_sni += key_share | |
client_encrypted_sni += record_digest | |
client_encrypted_sni += encrypted_sni | |
return client_encrypted_sni | |
if __name__ == "__main__": | |
# key_share bytestring and client_random sysargv | |
parser = argparse.ArgumentParser() | |
parser.add_argument('hostname', help="hostname of the target") | |
parser.add_argument('keyshares', help="Base64 encoded of key_share extension w/o the length") | |
parser.add_argument('clientrandom', help="Base64 encoded of client_random w/o the length") | |
args = parser.parse_args() | |
# If this script is being run from the command line, we use base64 to encode keyshares and clientrandom to avoid shell problems. | |
# The output of the script will also be a b64 encoded string and must be decoded for proper use | |
esni_server_name=args.hostname | |
key_shares=base64.b64decode(args.keyshares) | |
clientrandom=base64.b64decode(args.clientrandom) | |
esni_to_wire = create_esni(esni_server_name=esni_server_name, key_shares=key_shares, clientrandom=clientrandom) | |
sys.stdout.write(base64.b64encode(esni_to_wire).decode("ASCII")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment