Skip to content

Instantly share code, notes, and snippets.

@njv299
Created November 7, 2017 20:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save njv299/7a2eff20459d329066b31ef5586e26e6 to your computer and use it in GitHub Desktop.
Save njv299/7a2eff20459d329066b31ef5586e26e6 to your computer and use it in GitHub Desktop.
afl-unicorn Advanced Example
//--------------------
// From cgc_packet.h:
#define MAX_PACKET_LENGTH (48)
typedef struct PACKET_INFO_STRUCT
{
uint8_t packetData[MAX_PACKET_LENGTH];
uint32_t dataLen;
struct PACKET_INFO_STRUCT *pNextPacket;
fpPacketTypeHandler fpHandler;
uint8_t packetType;
} tSinglePacketData;
//----------------
// From packet.c:
void cgc_receive_packet( uint8_t *pData, uint8_t dataLen, uint16_t packetCRC )
{
if ( pData == NULL )
return;
if ( dataLen == 0 )
return;
// Perform Checksum check
uint16_t check_checksum = cgc_simple_checksum16( pData, dataLen );
// Validate Checksum
if ( packetCRC != check_checksum )
return;
// Process and receive packet
#if DEBUG_PACKET
cgc_printf( "Packet received %d!\n", pData[0] );
#endif
// Handle packet type
uint8_t packetType = pData[0];
uint8_t found = 0;
for ( uint32_t i = 0; i < cgc_g_packetHandlers.handlerCount; i++ )
{
if ( cgc_g_packetHandlers.pHandlers[i].packetType == packetType )
{
found = 1;
cgc_add_new_packet( packetType, cgc_g_packetHandlers.pHandlers[i].fpHandler, pData+1, dataLen-1 );
// ...
}
}
// ...
}
void cgc_add_new_packet( uint8_t packetType, fpPacketTypeHandler fpHandler, uint8_t *pData, uint8_t dataLen )
{
tSinglePacketData *pNewPacket = (tSinglePacketData *)cgc_malloc( sizeof(tSinglePacketData) );
// ...
// Copy in the data
cgc_memcpy( pNewPacket->packetData, pData, dataLen );
// ...
}
"""
fsk_message_service_test_harness.py
Loads the context of a process into Unicorn Engine,
loads a custom (mutated) inputs, and executes the
parser/handler routine.
Author:
Nathan Voss
"""
import argparse
import os
import signal
import struct
import time
from unicorn import *
from unicorn.x86_const import *
from capstone import *
import unicorn_loader
# Simple stand-in heap to prevent OS/kernel issues
unicorn_heap = None
# Start and end address of emulation
START_ADDRESS = 0x0804E452
END_ADDRESS = 0x0804E46F
# Address where checksum is checked and where it goes if it is valid
CHKSUM_CMP_ADDR = 0x0804D670
CHKSUM_PASSED_ADDR = 0x0804D675
# Entry points of addresses of functions to hook
MALLOC_ENTRY = 0x0804C570
FREE_ENTRY = 0x0804C290
PRINTF_ENTRY = 0x0804AAC0
CGC_TRANSMIT_ENTRY = 0x080493F0
"""
Implement target-specific hooks in here.
Stub out, skip past, and re-implement necessary functionality as appropriate
"""
def unicorn_hook_instruction(uc, address, size, user_data):
if address == MALLOC_ENTRY:
print("--- Rerouting call to malloc() @ 0x{0:08x} ---".format(address))
size = struct.unpack("<I", uc.mem_read(uc.reg_read(UC_X86_REG_ESP) + 4, 4))[0]
retval = unicorn_heap.malloc(size)
uc.reg_write(UC_X86_REG_EAX, retval)
uc.reg_write(UC_X86_REG_EIP, struct.unpack("<I", uc.mem_read(uc.reg_read(UC_X86_REG_ESP), 4))[0])
uc.reg_write(UC_X86_REG_ESP, uc.reg_read(UC_X86_REG_ESP) + 4)
# Bypass these functions by jumping straight out of them - We can't (or don't want to) emulate them
elif address == FREE_ENTRY or address == PRINTF_ENTRY or address == CGC_TRANSMIT_ENTRY:
print("--- Bypassing a function call that we don't want to emulate @ 0x{0:08x} ---".format(address))
uc.reg_write(UC_X86_REG_EIP, struct.unpack("<I", uc.mem_read(uc.reg_read(UC_X86_REG_ESP), 4))[0])
uc.reg_write(UC_X86_REG_ESP, uc.reg_read(UC_X86_REG_ESP) + 4)
# Bypass the checksum check
elif address == CHKSUM_CMP_ADDR:
print("--- Bypassing checksum validation @ 0x{0:08x} ---".format(address))
uc.reg_write(UC_X86_REG_EIP, CHKSUM_PASSED_ADDR)
#------------------------
#---- Main test function
def main():
parser = argparse.ArgumentParser()
parser.add_argument('context_dir', type=str, help="Directory containing process context")
parser.add_argument('packet_file', type=str, help="Path to the file containing the mutated packet content")
parser.add_argument('-d', '--debug', default=False, action="store_true", help="Dump trace info")
args = parser.parse_args()
print("Loading context from {}".format(args.context_dir))
uc = unicorn_loader.AflUnicornEngine(args.context_dir, enable_trace=args.debug, debug_print=False)
# Instantiate the hook function to avoid emulation errors
global unicorn_heap
unicorn_heap = unicorn_loader.UnicornSimpleHeap(uc, debug_print=True)
uc.hook_add(UC_HOOK_CODE, unicorn_hook_instruction)
# Execute 1 instruction just to startup the forkserver
print("Starting the forkserver by executing 1 instruction")
try:
uc.emu_start(START_ADDRESS, 0, 0, count=1)
except UcError as e:
print("ERROR: Failed to execute a single instruction (error: {})!".format(e))
return
# Allocate a buffer and load a mutated packet and put it into the right spot
if args.packet_file:
print("Loading packet content from {}".format(args.packet_file))
packet_file = open(args.packet_file, 'rb')
packet_content = packet_file.read()
packet_file.close()
# Apply constraints to mutated input
if len(packet_content) > 0xFF:
print("Test packet is too long (> {} bytes)".format(0xFF))
return
# Allocate a new buffer and put the packet into it
buf_addr = unicorn_heap.malloc(len(packet_content))
uc.mem_write(buf_addr, packet_content)
print("Allocated mutated packet buffer @ 0x{0:08x}".format(buf_addr))
# Set the packet into the state so it will be handled
uc.reg_write(UC_X86_REG_ECX, buf_addr)
uc.reg_write(UC_X86_REG_BL, len(packet_content))
# Run the test
print("Executing from 0x{0:08x} to 0x{1:08x}".format(START_ADDRESS, END_ADDRESS))
try:
result = uc.emu_start(START_ADDRESS, END_ADDRESS, timeout=0, count=0)
except UcError as e:
print("Execution failed with error: {}".format(e))
uc.dump_regs()
uc.force_crash(e)
print("Final register state:")
uc.dump_regs()
print("Done.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment