Created
March 15, 2019 17:54
-
-
Save tthtlc/3b9fff8e0e4c016fd5cb09a78dc94d2d to your computer and use it in GitHub Desktop.
using python to emulate assembly instructions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
template_test_harness.py | |
Template which loads the context of a process into a Unicorn Engine, | |
instance, loads a custom (mutated) inputs, and executes the | |
desired code. Designed to be used in conjunction with one of the | |
Unicorn Context Dumper scripts. | |
Author: | |
Nathan Voss <njvoss299@gmail.com> | |
""" | |
import argparse | |
import struct | |
import binascii | |
from unicorn import * | |
from unicorn.x86_const import * # TODO: Set correct architecture here as necessary | |
import unicorn_loader | |
# Simple stand-in heap to prevent OS/kernel issues | |
unicorn_heap = None | |
# Start and end address of emulation | |
START_ADDRESS = 0x00000000004004DA # TODO: Set start address here | |
END_ADDRESS = 0x0000000000400511 # TODO: Set end address here | |
def hook_mem_read(uc, type, addr,*args): | |
print(hex(addr)) | |
return False | |
def u32(data): | |
return struct.unpack(">I", data)[0] | |
def p32(num): | |
return struct.pack(">I", num) | |
""" | |
Implement target-specific hooks in here. | |
Stub out, skip past, and re-implement necessary functionality as appropriate | |
""" | |
def unicorn_hook_instruction(uc, address, size, user_data): | |
# TODO: Setup hooks and handle anything you need to here | |
# - For example, hook malloc/free/etc. and handle it internally | |
pass | |
#------------------------ | |
#---- Main test function | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('context_dir', type=str, help="Directory containing process context") | |
parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input content") | |
parser.add_argument('-d', '--debug', default=False, action="store_true", help="Dump trace info") | |
args = parser.parse_args() | |
print("Loading context from {}".format(args.context_dir)) | |
uc = unicorn_loader.AflUnicornEngine(args.context_dir, enable_trace=args.debug, debug_print=False) | |
# Instantiate the hook function to avoid emulation errors | |
global unicorn_heap | |
unicorn_heap = unicorn_loader.UnicornSimpleHeap(uc, debug_print=False) | |
uc.hook_add(UC_HOOK_CODE, unicorn_hook_instruction) | |
uc.hook_add(UC_HOOK_MEM_READ_UNMAPPED, hook_mem_read) | |
# Execute 1 instruction just to startup the forkserver | |
# NOTE: This instruction will be executed again later, so be sure that | |
# there are no negative consequences to the overall execution state. | |
# If there are, change the later call to emu_start to no re-execute | |
# the first instruction. | |
print("Starting the forkserver by executing 1 instruction") | |
try: | |
uc.emu_start(START_ADDRESS, 0, 0, count=1) | |
except UcError as e: | |
print("ERROR: Failed to execute a single instruction (error: {})!".format(e)) | |
return | |
def intr_hook(uc, intno, data): | |
print 'interrupt=%d, v0=%d, pc=0x%08x' % (intno, uc.reg_read(UC_X86_REG_V0), uc.reg_read(UC_X86_REG_PC)) | |
# hook interrupts for syscall | |
uc.hook_add(UC_HOOK_INTR, intr_hook) | |
# Allocate a buffer and load a mutated input and put it into the right spot | |
if args.input_file: | |
print("Loading input content from {}".format(args.input_file)) | |
input_file = open(args.input_file, 'rb') | |
input_content = input_file.read() | |
input_file.close() | |
# TODO: Apply constraints to mutated input here | |
# raise exceptions.NotImplementedError('No constraints on the mutated inputs have been set!') | |
if len(input_content) > 0xFF: | |
print("Test packet is too long (> {} bytes)".format(0xFF)) | |
return | |
# Allocate a new buffer and put the input into it. | |
buf_addr = unicorn_heap.malloc(len(input_content)) | |
stack_addr = unicorn_heap.malloc(1024) | |
uc.mem_write(buf_addr, input_content) | |
print("INPUT CONTENT: " + input_content) | |
print("INPUT CONTENT LENGTH: " + str(len(input_content))) | |
print("Allocated mutated input buffer @ 0x{0:016x}".format(buf_addr)) | |
# TODO: Set the input into the state so it will be handled | |
uc.reg_write(UC_X86_REG_RBP, stack_addr+512) | |
uc.reg_write(UC_X86_REG_RSP, stack_addr+512) | |
# raise exceptions.NotImplementedError('The mutated input was not loaded into the Unicorn state!') | |
# Run the test | |
print("Executing from 0x{0:016x} to 0x{1:016x}".format(START_ADDRESS, END_ADDRESS)) | |
try: | |
result = uc.emu_start(START_ADDRESS, END_ADDRESS, timeout=0, count=0) | |
#result = uc.emu_start(START_ADDRESS, 0, timeout=0, count=1) | |
except UcError as e: | |
# If something went wrong during emulation a signal is raised to force this | |
# script to crash in a way that AFL can detect ('uc.force_crash()' should be | |
# called for any condition that you want AFL to treat as a crash). | |
print("Execution failed with error: {}".format(e)) | |
uc.dump_regs() | |
uc.force_crash(e) | |
print("Final register state:") | |
uc.dump_regs() | |
print("Done.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment