Skip to content

Instantly share code, notes, and snippets.

@deeso
Created November 18, 2017 15:42
Show Gist options
  • Save deeso/dca3045c54d18198c4d38abbdad5ce6f to your computer and use it in GitHub Desktop.
Save deeso/dca3045c54d18198c4d38abbdad5ce6f to your computer and use it in GitHub Desktop.
Simple Immunity Debugger BP Hook
# Simple Breakpoint hooks for Immunity debug
# Important note that all data related to BP hooking and handling
# must be re-entrant, meaning extraneous global values in Python
# classes and script environment are bad. Lesson learned
# to use this script, put in the PyCommands directory of Immunity Debug
# !simplebphook <hex_base_address_of_exe>
#
#
# To (easily) get the BP_BASE_ADDRS, set breakpoints on all
# the references to DLL calls of interest. Then copy the entire
# table of BPs, extract the addresses, then subtract the relocated
# base address from the addresses.
import time
import immlib
from libhook import BpHook
# from Libs import immlib
WPM_CALLS = 0
WF_CALLS = 0
WC_CALLS = 0
BP_BASE_ADDRS = [
(0x00016DA, 'KERNEL32.LoadLibraryW'.lower()),
(0x00016EB, 'KERNEL32.LoadLibraryW'.lower()),
(0x00016FC, 'KERNEL32.LoadLibraryW'.lower()),
(0x000170D, 'KERNEL32.LoadLibraryW'.lower()),
(0x000176A, 'KERNEL32.CreateProcessA'.lower()),
(0x000181F, 'KERNEL32.WriteProcessMemory'.lower()),
(0x00018BC, 'KERNEL32.WriteProcessMemory'.lower()),
(0x0001934, 'KERNEL32.WriteProcessMemory'.lower()),
(0x000196C, 'KERNEL32.WriteProcessMemory'.lower()),
(0x00019A2, 'KERNEL32.WriteProcessMemory'.lower()),
(0x00019D8, 'KERNEL32.WriteProcessMemory'.lower()),
(0x0001A0E, 'KERNEL32.WriteProcessMemory'.lower()),
(0x0001A44, 'KERNEL32.WriteProcessMemory'.lower()),
(0x0001AD0, 'KERNEL32.WriteProcessMemory'.lower()),
(0x0001B62, 'KERNEL32.WriteProcessMemory'.lower()),
(0x0001B99, 'KERNEL32.CreateRemoteThread'.lower()),
(0x0001C53, 'KERNEL32.WriteProcessMemory'.lower()),
(0x0001CE5, 'KERNEL32.WriteProcessMemory'.lower()),
(0x0001D70, 'KERNEL32.WriteProcessMemory'.lower()),
(0x0001DC9, 'KERNEL32.CreateRemoteThread'.lower()),
(0x0001E12, 'KERNEL32.CreateRemoteThread'.lower()),
(0x0001E5C, 'KERNEL32.CreateRemoteThread'.lower()),
(0x000519D, 'KERNEL32.LoadLibraryExW'.lower()),
(0x00051DF, 'KERNEL32.LoadLibraryExW'.lower()),
(0x00078AB, 'KERNEL32.GetCommandLineA'.lower()),
(0x00078B6, 'KERNEL32.GetCommandLineW'.lower()),
(0x000A34A, 'KERNEL32.LoadLibraryExW'.lower()),
(0x000A38C, 'KERNEL32.LoadLibraryExW'.lower()),
# (0x000F774, 'KERNEL32.FlushFileBuffers'.lower()),
# (0x000F96F, 'KERNEL32.WriteFile'.lower()),
# (0x000F9A9, 'KERNEL32.WriteFile'.lower()),
# (0x000FB84, 'KERNEL32.WriteFile'.lower()),
# (0x000FC72, 'KERNEL32.WriteFile'.lower()),
# (0x000FD97, 'KERNEL32.WriteFile'.lower()),
# (0x0010046, 'KERNEL32.WriteFile'.lower()),
(0x0011B24, 'KERNEL32.CreateFileW'.lower()),
(0x0011B7D, 'KERNEL32.WriteConsoleW'.lower()),
(0x0011BAE, 'KERNEL32.WriteConsoleW'.lower()),
]
k32wcw = "KERNEL32.WriteConsoleW".lower()
k32wf = "KERNEL32.WriteFile".lower()
k32cfw = "KERNEL32.CreateFileW".lower()
k32lla = "KERNEL32.LoadLibraryA".lower()
k32llw = "KERNEL32.LoadLibraryW".lower()
k32llexw = "KERNEL32.LoadLibraryExW".lower()
k32gclia = "KERNEL32.GetCommandLineA".lower()
k32gcliw = "KERNEL32.GetCommandLineW".lower()
k32cpa = "KERNEL32.CreateProcessA".lower()
k32cpw = "KERNEL32.CreateProcessW".lower()
k32wpm = "KERNEL32.WriteProcessMemory".lower()
k32crt = "KERNEL32.CreateRemoteThread".lower()
def read_arg_astr(dbg, regs, arg=0, sz=4):
addr = dbg.readLong(regs['ESP']+arg*sz)
return dbg.readString(addr)
def read_arg_wstr(dbg, regs, arg=0, sz=4):
addr = dbg.readLong(regs['ESP']+arg*sz)
return dbg.readWString(addr).replace('\x00', '')
def read_arg(dbg, regs, arg=0, sz=4):
return dbg.readLong(regs['ESP']+arg*sz)
def handle_create_remote_thread(dbg, regs, name):
hprocess = read_arg(dbg, regs, 0)
stack_sz = read_arg(dbg, regs, 2)
start = read_arg(dbg, regs, 3)
dbg.log("0x%08x %s hProcess=0x%08x StackSz=0x%08x ProcAddr=0x%08x"%(regs['EIP'], name, hprocess, stack_sz, start), regs['EIP'])
def handle_nop(dbg, regs, name):
dbg.log("0x%08x calling %s "%(regs['EIP'], name), regs['EIP'])
def handle_write_process_memory(dbg, regs, name):
WPM_CALLS = time.time()*1000
hprocess = read_arg(dbg, regs, 0)
address = read_arg(dbg, regs, 1)
buf = read_arg(dbg, regs, 2)
bytestowrite = read_arg(dbg, regs, 3)
dbg.log("0x%08x %s hProcess=0x%08x ProcAddr=0x%08x, BufAddr=0x%08x, NumBytes=0x%08x"%(regs['EIP'], name, hprocess, address, buf, bytestowrite), regs['EIP'])
data = dbg.readMemory(buf, bytestowrite)
outfile = "C:\\writeprocmemory.%d-F-0x%08x-T-0x%08x"%(WPM_CALLS, buf, address)
open(outfile, 'wb').write(data)
def handle_write_file(dbg, regs, name):
WF_CALLS = time.time() * 1000
hprocess = read_arg(dbg, regs, 0)
buf = read_arg(dbg, regs, 1)
bytestowrite = read_arg(dbg, regs, 2)
dbg.log("0x%08x %s hFile=0x%08x BufAddr=0x%08x, NumBytes=0x%08x"%(regs['EIP'], name, hprocess, buf, bytestowrite), regs['EIP'])
data = dbg.readMemory(buf, bytestowrite)
outfile = "C:\\writefile.%d-F-0x%08x-T-0x%08x"%(WPM_CALLS, buf, hprocess)
open(outfile, 'wb').write(data)
def handle_write_console(dbg, regs, name):
WC_CALLS = time.time() * 1000
hprocess = read_arg(dbg, regs, 0)
buf = read_arg(dbg, regs, 1)
bytestowrite = read_arg(dbg, regs, 2)
dbg.log("0x%08x %s hConsole=0x%08x BufAddr=0x%08x, NumBytes=0x%08x"%(regs['EIP'], name, hprocess, buf, bytestowrite), regs['EIP'])
data = dbg.readMemory(buf, bytestowrite)
outfile = "C:\\writeconsole.%d-F-0x%08x-T-0x%08x"%(WPM_CALLS, buf, hprocess)
open(outfile, 'wb').write(data)
def handle_read_first_astr(dbg, regs, name):
s = read_arg_astr(dbg, regs)
dbg.log("0x%08x %s Param=%s"%(regs['EIP'], name, s), regs['EIP'])
def handle_read_first_wstr(dbg, regs, name):
s = read_arg_wstr(dbg, regs, )
dbg.log("0x%08x %s Param=%s"%(regs['EIP'], name, s), regs['EIP'])
HANDLERS = {
k32wcw: handle_write_console,
k32wf: handle_write_file,
k32cfw: handle_read_first_wstr,
k32lla: handle_read_first_astr,
k32gclia: handle_nop,
k32cpa: handle_read_first_astr,
k32llw: handle_read_first_wstr,
k32llexw: handle_read_first_wstr,
k32gcliw: handle_nop,
k32cpw: handle_read_first_wstr,
k32wpm: handle_write_process_memory,
k32crt: handle_create_remote_thread,
}
class SimpleBPHook(BpHook):
def __init__(self, handler, name, addresses=[]):
BpHook.__init__(self)
dbg = immlib.Debugger()
self.handler = handler
self.name = name
for addr in addresses:
desc = 'call %s@%08x' % (name, addr)
dbg.log("Adding 0x%08x: %s" % (addr, desc), 0)
self.add(desc, address=addr, force=1)
self.enable()
def run(self, regs):
self.main(regs)
def main(self, regs):
ip = regs['EIP']
dbg = immlib.Debugger()
dbg.log("BP Hit handling address 0x%08x" % (ip), ip)
self.handler(dbg, regs, self.name)
def main(args):
if len(args) != 1:
immlib.log("A base offset is required")
return "failed to create hooks"
base = args[0]
if base.find('0x') == 0:
base = long(base, 16)
elif base.isdigit():
base = long(base)
else:
immlib.Error("A base offset is required")
return "failed to create hooks"
# map BP Hook Handler names to the BP FN call
handlers = dict((i, []) for i in HANDLERS.keys())
# rebase the the addresses and use the fn_call to bucketize the handlers
addresses = [(long(addr+base), fn_call) for addr, fn_call in BP_BASE_ADDRS]
for addr, desc in addresses:
handlers[desc].append(addr)
# Create a hook for each function call (e.g. kernel32.loadlibrarya)
hooks = []
for n, addrs in handlers.items():
handler = HANDLERS[n]
hooks.append(SimpleBPHook(handler, n, addresses=addrs))
return "Completed hook registration"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment