Created
November 7, 2017 20:27
-
-
Save njv299/7a2eff20459d329066b31ef5586e26e6 to your computer and use it in GitHub Desktop.
afl-unicorn Advanced Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//-------------------- | |
// From cgc_packet.h: | |
#define MAX_PACKET_LENGTH (48) | |
typedef struct PACKET_INFO_STRUCT | |
{ | |
uint8_t packetData[MAX_PACKET_LENGTH]; | |
uint32_t dataLen; | |
struct PACKET_INFO_STRUCT *pNextPacket; | |
fpPacketTypeHandler fpHandler; | |
uint8_t packetType; | |
} tSinglePacketData; | |
//---------------- | |
// From packet.c: | |
void cgc_receive_packet( uint8_t *pData, uint8_t dataLen, uint16_t packetCRC ) | |
{ | |
if ( pData == NULL ) | |
return; | |
if ( dataLen == 0 ) | |
return; | |
// Perform Checksum check | |
uint16_t check_checksum = cgc_simple_checksum16( pData, dataLen ); | |
// Validate Checksum | |
if ( packetCRC != check_checksum ) | |
return; | |
// Process and receive packet | |
#if DEBUG_PACKET | |
cgc_printf( "Packet received %d!\n", pData[0] ); | |
#endif | |
// Handle packet type | |
uint8_t packetType = pData[0]; | |
uint8_t found = 0; | |
for ( uint32_t i = 0; i < cgc_g_packetHandlers.handlerCount; i++ ) | |
{ | |
if ( cgc_g_packetHandlers.pHandlers[i].packetType == packetType ) | |
{ | |
found = 1; | |
cgc_add_new_packet( packetType, cgc_g_packetHandlers.pHandlers[i].fpHandler, pData+1, dataLen-1 ); | |
// ... | |
} | |
} | |
// ... | |
} | |
void cgc_add_new_packet( uint8_t packetType, fpPacketTypeHandler fpHandler, uint8_t *pData, uint8_t dataLen ) | |
{ | |
tSinglePacketData *pNewPacket = (tSinglePacketData *)cgc_malloc( sizeof(tSinglePacketData) ); | |
// ... | |
// Copy in the data | |
cgc_memcpy( pNewPacket->packetData, pData, dataLen ); | |
// ... | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
fsk_message_service_test_harness.py | |
Loads the context of a process into Unicorn Engine, | |
loads a custom (mutated) inputs, and executes the | |
parser/handler routine. | |
Author: | |
Nathan Voss | |
""" | |
import argparse | |
import os | |
import signal | |
import struct | |
import time | |
from unicorn import * | |
from unicorn.x86_const import * | |
from capstone import * | |
import unicorn_loader | |
# Simple stand-in heap to prevent OS/kernel issues | |
unicorn_heap = None | |
# Start and end address of emulation | |
START_ADDRESS = 0x0804E452 | |
END_ADDRESS = 0x0804E46F | |
# Address where checksum is checked and where it goes if it is valid | |
CHKSUM_CMP_ADDR = 0x0804D670 | |
CHKSUM_PASSED_ADDR = 0x0804D675 | |
# Entry points of addresses of functions to hook | |
MALLOC_ENTRY = 0x0804C570 | |
FREE_ENTRY = 0x0804C290 | |
PRINTF_ENTRY = 0x0804AAC0 | |
CGC_TRANSMIT_ENTRY = 0x080493F0 | |
""" | |
Implement target-specific hooks in here. | |
Stub out, skip past, and re-implement necessary functionality as appropriate | |
""" | |
def unicorn_hook_instruction(uc, address, size, user_data): | |
if address == MALLOC_ENTRY: | |
print("--- Rerouting call to malloc() @ 0x{0:08x} ---".format(address)) | |
size = struct.unpack("<I", uc.mem_read(uc.reg_read(UC_X86_REG_ESP) + 4, 4))[0] | |
retval = unicorn_heap.malloc(size) | |
uc.reg_write(UC_X86_REG_EAX, retval) | |
uc.reg_write(UC_X86_REG_EIP, struct.unpack("<I", uc.mem_read(uc.reg_read(UC_X86_REG_ESP), 4))[0]) | |
uc.reg_write(UC_X86_REG_ESP, uc.reg_read(UC_X86_REG_ESP) + 4) | |
# Bypass these functions by jumping straight out of them - We can't (or don't want to) emulate them | |
elif address == FREE_ENTRY or address == PRINTF_ENTRY or address == CGC_TRANSMIT_ENTRY: | |
print("--- Bypassing a function call that we don't want to emulate @ 0x{0:08x} ---".format(address)) | |
uc.reg_write(UC_X86_REG_EIP, struct.unpack("<I", uc.mem_read(uc.reg_read(UC_X86_REG_ESP), 4))[0]) | |
uc.reg_write(UC_X86_REG_ESP, uc.reg_read(UC_X86_REG_ESP) + 4) | |
# Bypass the checksum check | |
elif address == CHKSUM_CMP_ADDR: | |
print("--- Bypassing checksum validation @ 0x{0:08x} ---".format(address)) | |
uc.reg_write(UC_X86_REG_EIP, CHKSUM_PASSED_ADDR) | |
#------------------------ | |
#---- Main test function | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('context_dir', type=str, help="Directory containing process context") | |
parser.add_argument('packet_file', type=str, help="Path to the file containing the mutated packet content") | |
parser.add_argument('-d', '--debug', default=False, action="store_true", help="Dump trace info") | |
args = parser.parse_args() | |
print("Loading context from {}".format(args.context_dir)) | |
uc = unicorn_loader.AflUnicornEngine(args.context_dir, enable_trace=args.debug, debug_print=False) | |
# Instantiate the hook function to avoid emulation errors | |
global unicorn_heap | |
unicorn_heap = unicorn_loader.UnicornSimpleHeap(uc, debug_print=True) | |
uc.hook_add(UC_HOOK_CODE, unicorn_hook_instruction) | |
# Execute 1 instruction just to startup the forkserver | |
print("Starting the forkserver by executing 1 instruction") | |
try: | |
uc.emu_start(START_ADDRESS, 0, 0, count=1) | |
except UcError as e: | |
print("ERROR: Failed to execute a single instruction (error: {})!".format(e)) | |
return | |
# Allocate a buffer and load a mutated packet and put it into the right spot | |
if args.packet_file: | |
print("Loading packet content from {}".format(args.packet_file)) | |
packet_file = open(args.packet_file, 'rb') | |
packet_content = packet_file.read() | |
packet_file.close() | |
# Apply constraints to mutated input | |
if len(packet_content) > 0xFF: | |
print("Test packet is too long (> {} bytes)".format(0xFF)) | |
return | |
# Allocate a new buffer and put the packet into it | |
buf_addr = unicorn_heap.malloc(len(packet_content)) | |
uc.mem_write(buf_addr, packet_content) | |
print("Allocated mutated packet buffer @ 0x{0:08x}".format(buf_addr)) | |
# Set the packet into the state so it will be handled | |
uc.reg_write(UC_X86_REG_ECX, buf_addr) | |
uc.reg_write(UC_X86_REG_BL, len(packet_content)) | |
# Run the test | |
print("Executing from 0x{0:08x} to 0x{1:08x}".format(START_ADDRESS, END_ADDRESS)) | |
try: | |
result = uc.emu_start(START_ADDRESS, END_ADDRESS, timeout=0, count=0) | |
except UcError as e: | |
print("Execution failed with error: {}".format(e)) | |
uc.dump_regs() | |
uc.force_crash(e) | |
print("Final register state:") | |
uc.dump_regs() | |
print("Done.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment