Last active
November 8, 2017 18:55
-
-
Save njv299/fd6168e7eff07a581cbda246cf0683b4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
fsk_message_service_test_harness.py | |
Loads the context of a process into Unicorn Engine, | |
loads a custom (mutated) inputs, and executes the | |
parser/handler routine. | |
Author: | |
Nathan Voss | |
""" | |
import argparse | |
import struct | |
from unicorn import * | |
from unicorn.x86_const import * | |
import unicorn_loader | |
# Simple stand-in heap to prevent OS/kernel issues | |
unicorn_heap = None | |
# Start and end address of emulation | |
START_ADDRESS = 0x0804E452 | |
END_ADDRESS = 0x0804E46F | |
# Address where checksum is checked and where it goes if it is valid | |
CHKSUM_CMP_ADDR = 0x0804D670 | |
CHKSUM_PASSED_ADDR = 0x0804D675 | |
# Entry points of addresses of functions to hook | |
MALLOC_ENTRY = 0x0804C570 | |
FREE_ENTRY = 0x0804C290 | |
PRINTF_ENTRY = 0x0804AAC0 | |
CGC_TRANSMIT_ENTRY = 0x080493F0 | |
""" | |
Implement target-specific hooks in here. | |
Stub out, skip past, and re-implement necessary functionality as appropriate | |
""" | |
def unicorn_hook_instruction(uc, address, size, user_data): | |
if address == MALLOC_ENTRY: | |
print("--- Rerouting call to malloc() @ 0x{0:08x} ---".format(address)) | |
size = struct.unpack("<I", uc.mem_read(uc.reg_read(UC_X86_REG_ESP) + 4, 4))[0] | |
retval = unicorn_heap.malloc(size) | |
uc.reg_write(UC_X86_REG_EAX, retval) | |
uc.reg_write(UC_X86_REG_EIP, struct.unpack("<I", uc.mem_read(uc.reg_read(UC_X86_REG_ESP), 4))[0]) | |
uc.reg_write(UC_X86_REG_ESP, uc.reg_read(UC_X86_REG_ESP) + 4) | |
# Bypass these functions by jumping straight out of them - We can't (or don't want to) emulate them | |
elif address == FREE_ENTRY or address == PRINTF_ENTRY or address == CGC_TRANSMIT_ENTRY: | |
print("--- Bypassing a function call that we don't want to emulate @ 0x{0:08x} ---".format(address)) | |
uc.reg_write(UC_X86_REG_EIP, struct.unpack("<I", uc.mem_read(uc.reg_read(UC_X86_REG_ESP), 4))[0]) | |
uc.reg_write(UC_X86_REG_ESP, uc.reg_read(UC_X86_REG_ESP) + 4) | |
# Bypass the checksum check | |
elif address == CHKSUM_CMP_ADDR: | |
print("--- Bypassing checksum validation @ 0x{0:08x} ---".format(address)) | |
uc.reg_write(UC_X86_REG_EIP, CHKSUM_PASSED_ADDR) | |
#------------------------ | |
#---- Main test function | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('context_dir', type=str, help="Directory containing process context") | |
parser.add_argument('packet_file', type=str, help="Path to the file containing the mutated packet content") | |
parser.add_argument('-d', '--debug', default=False, action="store_true", help="Dump trace info") | |
args = parser.parse_args() | |
print("Loading context from {}".format(args.context_dir)) | |
uc = unicorn_loader.AflUnicornEngine(args.context_dir, enable_trace=args.debug, debug_print=False) | |
# Instantiate the hook function to avoid emulation errors | |
global unicorn_heap | |
unicorn_heap = unicorn_loader.UnicornSimpleHeap(uc, debug_print=True) | |
uc.hook_add(UC_HOOK_CODE, unicorn_hook_instruction) | |
# Execute 1 instruction just to startup the forkserver | |
print("Starting the forkserver by executing 1 instruction") | |
try: | |
uc.emu_start(START_ADDRESS, 0, 0, count=1) | |
except UcError as e: | |
print("ERROR: Failed to execute a single instruction (error: {})!".format(e)) | |
return | |
# Allocate a buffer and load a mutated packet and put it into the right spot | |
if args.packet_file: | |
print("Loading packet content from {}".format(args.packet_file)) | |
packet_file = open(args.packet_file, 'rb') | |
packet_content = packet_file.read() | |
packet_file.close() | |
# Apply constraints to mutated input | |
if len(packet_content) > 0xFF: | |
print("Test packet is too long (> {} bytes)".format(0xFF)) | |
return | |
# Allocate a new buffer and put the packet into it | |
buf_addr = unicorn_heap.malloc(len(packet_content)) | |
uc.mem_write(buf_addr, packet_content) | |
print("Allocated mutated packet buffer @ 0x{0:08x}".format(buf_addr)) | |
# Set the packet into the state so it will be handled | |
uc.reg_write(UC_X86_REG_ECX, buf_addr) | |
uc.reg_write(UC_X86_REG_BL, len(packet_content)) | |
# Run the test | |
print("Executing from 0x{0:08x} to 0x{1:08x}".format(START_ADDRESS, END_ADDRESS)) | |
try: | |
result = uc.emu_start(START_ADDRESS, END_ADDRESS, timeout=0, count=0) | |
except UcError as e: | |
print("Execution failed with error: {}".format(e)) | |
uc.dump_regs() | |
uc.force_crash(e) | |
print("Final register state:") | |
uc.dump_regs() | |
print("Done.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment