Skip to content

Instantly share code, notes, and snippets.

@tthtlc
Created March 15, 2019 17:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tthtlc/3b9fff8e0e4c016fd5cb09a78dc94d2d to your computer and use it in GitHub Desktop.
Save tthtlc/3b9fff8e0e4c016fd5cb09a78dc94d2d to your computer and use it in GitHub Desktop.
using python to emulate assembly instructions
"""
template_test_harness.py
Template which loads the context of a process into a Unicorn Engine,
instance, loads a custom (mutated) inputs, and executes the
desired code. Designed to be used in conjunction with one of the
Unicorn Context Dumper scripts.
Author:
Nathan Voss <njvoss299@gmail.com>
"""
import argparse
import struct
import binascii
from unicorn import *
from unicorn.x86_const import * # TODO: Set correct architecture here as necessary
import unicorn_loader
# Simple stand-in heap to prevent OS/kernel issues
unicorn_heap = None
# Start and end address of emulation
START_ADDRESS = 0x00000000004004DA # TODO: Set start address here
END_ADDRESS = 0x0000000000400511 # TODO: Set end address here
def hook_mem_read(uc, type, addr,*args):
print(hex(addr))
return False
def u32(data):
return struct.unpack(">I", data)[0]
def p32(num):
return struct.pack(">I", num)
"""
Implement target-specific hooks in here.
Stub out, skip past, and re-implement necessary functionality as appropriate
"""
def unicorn_hook_instruction(uc, address, size, user_data):
# TODO: Setup hooks and handle anything you need to here
# - For example, hook malloc/free/etc. and handle it internally
pass
#------------------------
#---- Main test function
def main():
parser = argparse.ArgumentParser()
parser.add_argument('context_dir', type=str, help="Directory containing process context")
parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input content")
parser.add_argument('-d', '--debug', default=False, action="store_true", help="Dump trace info")
args = parser.parse_args()
print("Loading context from {}".format(args.context_dir))
uc = unicorn_loader.AflUnicornEngine(args.context_dir, enable_trace=args.debug, debug_print=False)
# Instantiate the hook function to avoid emulation errors
global unicorn_heap
unicorn_heap = unicorn_loader.UnicornSimpleHeap(uc, debug_print=False)
uc.hook_add(UC_HOOK_CODE, unicorn_hook_instruction)
uc.hook_add(UC_HOOK_MEM_READ_UNMAPPED, hook_mem_read)
# Execute 1 instruction just to startup the forkserver
# NOTE: This instruction will be executed again later, so be sure that
# there are no negative consequences to the overall execution state.
# If there are, change the later call to emu_start to no re-execute
# the first instruction.
print("Starting the forkserver by executing 1 instruction")
try:
uc.emu_start(START_ADDRESS, 0, 0, count=1)
except UcError as e:
print("ERROR: Failed to execute a single instruction (error: {})!".format(e))
return
def intr_hook(uc, intno, data):
print 'interrupt=%d, v0=%d, pc=0x%08x' % (intno, uc.reg_read(UC_X86_REG_V0), uc.reg_read(UC_X86_REG_PC))
# hook interrupts for syscall
uc.hook_add(UC_HOOK_INTR, intr_hook)
# Allocate a buffer and load a mutated input and put it into the right spot
if args.input_file:
print("Loading input content from {}".format(args.input_file))
input_file = open(args.input_file, 'rb')
input_content = input_file.read()
input_file.close()
# TODO: Apply constraints to mutated input here
# raise exceptions.NotImplementedError('No constraints on the mutated inputs have been set!')
if len(input_content) > 0xFF:
print("Test packet is too long (> {} bytes)".format(0xFF))
return
# Allocate a new buffer and put the input into it.
buf_addr = unicorn_heap.malloc(len(input_content))
stack_addr = unicorn_heap.malloc(1024)
uc.mem_write(buf_addr, input_content)
print("INPUT CONTENT: " + input_content)
print("INPUT CONTENT LENGTH: " + str(len(input_content)))
print("Allocated mutated input buffer @ 0x{0:016x}".format(buf_addr))
# TODO: Set the input into the state so it will be handled
uc.reg_write(UC_X86_REG_RBP, stack_addr+512)
uc.reg_write(UC_X86_REG_RSP, stack_addr+512)
# raise exceptions.NotImplementedError('The mutated input was not loaded into the Unicorn state!')
# Run the test
print("Executing from 0x{0:016x} to 0x{1:016x}".format(START_ADDRESS, END_ADDRESS))
try:
result = uc.emu_start(START_ADDRESS, END_ADDRESS, timeout=0, count=0)
#result = uc.emu_start(START_ADDRESS, 0, timeout=0, count=1)
except UcError as e:
# If something went wrong during emulation a signal is raised to force this
# script to crash in a way that AFL can detect ('uc.force_crash()' should be
# called for any condition that you want AFL to treat as a crash).
print("Execution failed with error: {}".format(e))
uc.dump_regs()
uc.force_crash(e)
print("Final register state:")
uc.dump_regs()
print("Done.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment