Created
November 18, 2017 15:42
-
-
Save deeso/dca3045c54d18198c4d38abbdad5ce6f to your computer and use it in GitHub Desktop.
Simple Immunity Debugger BP Hook
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Simple Breakpoint hooks for Immunity debug | |
# Important note that all data related to BP hooking and handling | |
# must be re-entrant, meaning extraneous global values in Python | |
# classes and script environment are bad. Lesson learned | |
# to use this script, put in the PyCommands directory of Immunity Debug | |
# !simplebphook <hex_base_address_of_exe> | |
# | |
# | |
# To (easily) get the BP_BASE_ADDRS, set breakpoints on all | |
# the references to DLL calls of interest. Then copy the entire | |
# table of BPs, extract the addresses, then subtract the relocated | |
# base address from the addresses. | |
import time | |
import immlib | |
from libhook import BpHook | |
# from Libs import immlib | |
WPM_CALLS = 0 | |
WF_CALLS = 0 | |
WC_CALLS = 0 | |
BP_BASE_ADDRS = [ | |
(0x00016DA, 'KERNEL32.LoadLibraryW'.lower()), | |
(0x00016EB, 'KERNEL32.LoadLibraryW'.lower()), | |
(0x00016FC, 'KERNEL32.LoadLibraryW'.lower()), | |
(0x000170D, 'KERNEL32.LoadLibraryW'.lower()), | |
(0x000176A, 'KERNEL32.CreateProcessA'.lower()), | |
(0x000181F, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x00018BC, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x0001934, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x000196C, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x00019A2, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x00019D8, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x0001A0E, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x0001A44, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x0001AD0, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x0001B62, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x0001B99, 'KERNEL32.CreateRemoteThread'.lower()), | |
(0x0001C53, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x0001CE5, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x0001D70, 'KERNEL32.WriteProcessMemory'.lower()), | |
(0x0001DC9, 'KERNEL32.CreateRemoteThread'.lower()), | |
(0x0001E12, 'KERNEL32.CreateRemoteThread'.lower()), | |
(0x0001E5C, 'KERNEL32.CreateRemoteThread'.lower()), | |
(0x000519D, 'KERNEL32.LoadLibraryExW'.lower()), | |
(0x00051DF, 'KERNEL32.LoadLibraryExW'.lower()), | |
(0x00078AB, 'KERNEL32.GetCommandLineA'.lower()), | |
(0x00078B6, 'KERNEL32.GetCommandLineW'.lower()), | |
(0x000A34A, 'KERNEL32.LoadLibraryExW'.lower()), | |
(0x000A38C, 'KERNEL32.LoadLibraryExW'.lower()), | |
# (0x000F774, 'KERNEL32.FlushFileBuffers'.lower()), | |
# (0x000F96F, 'KERNEL32.WriteFile'.lower()), | |
# (0x000F9A9, 'KERNEL32.WriteFile'.lower()), | |
# (0x000FB84, 'KERNEL32.WriteFile'.lower()), | |
# (0x000FC72, 'KERNEL32.WriteFile'.lower()), | |
# (0x000FD97, 'KERNEL32.WriteFile'.lower()), | |
# (0x0010046, 'KERNEL32.WriteFile'.lower()), | |
(0x0011B24, 'KERNEL32.CreateFileW'.lower()), | |
(0x0011B7D, 'KERNEL32.WriteConsoleW'.lower()), | |
(0x0011BAE, 'KERNEL32.WriteConsoleW'.lower()), | |
] | |
k32wcw = "KERNEL32.WriteConsoleW".lower() | |
k32wf = "KERNEL32.WriteFile".lower() | |
k32cfw = "KERNEL32.CreateFileW".lower() | |
k32lla = "KERNEL32.LoadLibraryA".lower() | |
k32llw = "KERNEL32.LoadLibraryW".lower() | |
k32llexw = "KERNEL32.LoadLibraryExW".lower() | |
k32gclia = "KERNEL32.GetCommandLineA".lower() | |
k32gcliw = "KERNEL32.GetCommandLineW".lower() | |
k32cpa = "KERNEL32.CreateProcessA".lower() | |
k32cpw = "KERNEL32.CreateProcessW".lower() | |
k32wpm = "KERNEL32.WriteProcessMemory".lower() | |
k32crt = "KERNEL32.CreateRemoteThread".lower() | |
def read_arg_astr(dbg, regs, arg=0, sz=4): | |
addr = dbg.readLong(regs['ESP']+arg*sz) | |
return dbg.readString(addr) | |
def read_arg_wstr(dbg, regs, arg=0, sz=4): | |
addr = dbg.readLong(regs['ESP']+arg*sz) | |
return dbg.readWString(addr).replace('\x00', '') | |
def read_arg(dbg, regs, arg=0, sz=4): | |
return dbg.readLong(regs['ESP']+arg*sz) | |
def handle_create_remote_thread(dbg, regs, name): | |
hprocess = read_arg(dbg, regs, 0) | |
stack_sz = read_arg(dbg, regs, 2) | |
start = read_arg(dbg, regs, 3) | |
dbg.log("0x%08x %s hProcess=0x%08x StackSz=0x%08x ProcAddr=0x%08x"%(regs['EIP'], name, hprocess, stack_sz, start), regs['EIP']) | |
def handle_nop(dbg, regs, name): | |
dbg.log("0x%08x calling %s "%(regs['EIP'], name), regs['EIP']) | |
def handle_write_process_memory(dbg, regs, name): | |
WPM_CALLS = time.time()*1000 | |
hprocess = read_arg(dbg, regs, 0) | |
address = read_arg(dbg, regs, 1) | |
buf = read_arg(dbg, regs, 2) | |
bytestowrite = read_arg(dbg, regs, 3) | |
dbg.log("0x%08x %s hProcess=0x%08x ProcAddr=0x%08x, BufAddr=0x%08x, NumBytes=0x%08x"%(regs['EIP'], name, hprocess, address, buf, bytestowrite), regs['EIP']) | |
data = dbg.readMemory(buf, bytestowrite) | |
outfile = "C:\\writeprocmemory.%d-F-0x%08x-T-0x%08x"%(WPM_CALLS, buf, address) | |
open(outfile, 'wb').write(data) | |
def handle_write_file(dbg, regs, name): | |
WF_CALLS = time.time() * 1000 | |
hprocess = read_arg(dbg, regs, 0) | |
buf = read_arg(dbg, regs, 1) | |
bytestowrite = read_arg(dbg, regs, 2) | |
dbg.log("0x%08x %s hFile=0x%08x BufAddr=0x%08x, NumBytes=0x%08x"%(regs['EIP'], name, hprocess, buf, bytestowrite), regs['EIP']) | |
data = dbg.readMemory(buf, bytestowrite) | |
outfile = "C:\\writefile.%d-F-0x%08x-T-0x%08x"%(WPM_CALLS, buf, hprocess) | |
open(outfile, 'wb').write(data) | |
def handle_write_console(dbg, regs, name): | |
WC_CALLS = time.time() * 1000 | |
hprocess = read_arg(dbg, regs, 0) | |
buf = read_arg(dbg, regs, 1) | |
bytestowrite = read_arg(dbg, regs, 2) | |
dbg.log("0x%08x %s hConsole=0x%08x BufAddr=0x%08x, NumBytes=0x%08x"%(regs['EIP'], name, hprocess, buf, bytestowrite), regs['EIP']) | |
data = dbg.readMemory(buf, bytestowrite) | |
outfile = "C:\\writeconsole.%d-F-0x%08x-T-0x%08x"%(WPM_CALLS, buf, hprocess) | |
open(outfile, 'wb').write(data) | |
def handle_read_first_astr(dbg, regs, name): | |
s = read_arg_astr(dbg, regs) | |
dbg.log("0x%08x %s Param=%s"%(regs['EIP'], name, s), regs['EIP']) | |
def handle_read_first_wstr(dbg, regs, name): | |
s = read_arg_wstr(dbg, regs, ) | |
dbg.log("0x%08x %s Param=%s"%(regs['EIP'], name, s), regs['EIP']) | |
HANDLERS = { | |
k32wcw: handle_write_console, | |
k32wf: handle_write_file, | |
k32cfw: handle_read_first_wstr, | |
k32lla: handle_read_first_astr, | |
k32gclia: handle_nop, | |
k32cpa: handle_read_first_astr, | |
k32llw: handle_read_first_wstr, | |
k32llexw: handle_read_first_wstr, | |
k32gcliw: handle_nop, | |
k32cpw: handle_read_first_wstr, | |
k32wpm: handle_write_process_memory, | |
k32crt: handle_create_remote_thread, | |
} | |
class SimpleBPHook(BpHook): | |
def __init__(self, handler, name, addresses=[]): | |
BpHook.__init__(self) | |
dbg = immlib.Debugger() | |
self.handler = handler | |
self.name = name | |
for addr in addresses: | |
desc = 'call %s@%08x' % (name, addr) | |
dbg.log("Adding 0x%08x: %s" % (addr, desc), 0) | |
self.add(desc, address=addr, force=1) | |
self.enable() | |
def run(self, regs): | |
self.main(regs) | |
def main(self, regs): | |
ip = regs['EIP'] | |
dbg = immlib.Debugger() | |
dbg.log("BP Hit handling address 0x%08x" % (ip), ip) | |
self.handler(dbg, regs, self.name) | |
def main(args): | |
if len(args) != 1: | |
immlib.log("A base offset is required") | |
return "failed to create hooks" | |
base = args[0] | |
if base.find('0x') == 0: | |
base = long(base, 16) | |
elif base.isdigit(): | |
base = long(base) | |
else: | |
immlib.Error("A base offset is required") | |
return "failed to create hooks" | |
# map BP Hook Handler names to the BP FN call | |
handlers = dict((i, []) for i in HANDLERS.keys()) | |
# rebase the the addresses and use the fn_call to bucketize the handlers | |
addresses = [(long(addr+base), fn_call) for addr, fn_call in BP_BASE_ADDRS] | |
for addr, desc in addresses: | |
handlers[desc].append(addr) | |
# Create a hook for each function call (e.g. kernel32.loadlibrarya) | |
hooks = [] | |
for n, addrs in handlers.items(): | |
handler = HANDLERS[n] | |
hooks.append(SimpleBPHook(handler, n, addresses=addrs)) | |
return "Completed hook registration" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment