Skip to content

Instantly share code, notes, and snippets.

@williballenthin
Last active November 5, 2023 22:12
Show Gist options
  • Star 17 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save williballenthin/1cb2512b726d3bbc955746f69eaed0da to your computer and use it in GitHub Desktop.
Save williballenthin/1cb2512b726d3bbc955746f69eaed0da to your computer and use it in GitHub Desktop.
automatically resolve shellcode hashes into symbolic names using emulation, example: https://asciinema.org/a/EaHLv3yy7nGnh7mfHQ5DVy1LJ
import os
import sys
import logging
import pefile
import ucutils
import unicorn
import capstone
import argparse
import ucutils.emu
import ucutils.cli
import ucutils.plat.win32
logger = logging.getLogger('auto-shellcode-hashes')
# unicorn and capstone are separate projects.
# i'm not sure that the register mappings are guaranteed to be consistent.
# so we build a mapping that translates capstone <-> unicorn register constants
U2C = {}
C2U = {}
# mapping from constant to string representation
U2S = {}
C2S = {}
REGS = set([])
for const_name in dir(capstone.x86_const):
if not const_name.startswith('X86_REG_'):
continue
uconst_name = 'UC_' + const_name
reg_name = const_name[len('X86_REG_'):].lower()
uconst = getattr(unicorn.x86_const, uconst_name)
cconst = getattr(capstone.x86_const, const_name)
U2C[uconst] = cconst
C2U[cconst] = uconst
U2S[uconst] = reg_name
C2S[cconst] = reg_name
REGS.add(reg_name)
def load(emu, sc_addr, sc, dlls):
'''
load the shellcode at the given address, and map in the given DLLs.
maps the following:
- instructions
- TEB, PEB, and LDR_DATA
- stack
- each DLL
'''
logger.debug('mapping instructions at 0x%x', sc_addr)
emu.mem.map_data(sc_addr, sc, reason='code')
# stack layout:
#
# min-addr -> STACK_ADDR
# $sp ------> STACK_ADDR + 0x1000
# $bp ------> STACK_ADDR + 0x2000
# max-addr -> STACK_ADDR + 0x3000
logger.debug('mapping stack at 0x%x', ucutils.STACK_ADDR)
emu.mem.map_region(ucutils.STACK_ADDR, ucutils.STACK_SIZE, reason='stack')
emu.stack_pointer = ucutils.STACK_ADDR + 0x1000
emu.base_pointer = ucutils.STACK_ADDR + 0x2000
emu.plat.map_teb()
for dll in dlls:
# we map DLLs as read-only since we do not want to emulate any of their code
emu.plat.load_dll(dll, perms=unicorn.UC_PROT_READ)
return sc_addr
class SimpleCmpRegisterTaintTracker(ucutils.emu.Hook):
'''
hook emulation and search for instructions like:
cmp REG, REG
where one of REG has the given target value.
tracks the other "tainted" value in `.tainted_values`.
'''
HOOK_TYPE = unicorn.UC_HOOK_CODE
def __init__(self, target):
super(SimpleCmpRegisterTaintTracker, self).__init__()
# look for comparsions against this target value.
self.target = target
# these are the values compared against our target.
self.tainted_values = set([])
def hook(self, emu, address, size, user_data):
buf = emu.mem_read(address, size)
insn = next(emu.dis.disasm(bytes(buf), address))
if insn.mnemonic != 'cmp':
return
op0, op1 = insn.operands
if op0.type != capstone.x86_const.X86_OP_REG:
return
if op1.type != capstone.x86_const.X86_OP_REG:
return
val0 = emu.reg_read(C2U[op0.reg])
val1 = emu.reg_read(C2U[op1.reg])
if val0 != self.target and val1 != self.target:
return
logger.debug('0x%x: tainted comparison: cmp %s=0x%x, %s=0x%x',
address,
C2S[op0.reg],
val0,
C2S[op1.reg],
val1)
self.tainted_values.add(val0)
self.tainted_values.add(val1)
class SimpleCmpMemTaintTracker(ucutils.emu.Hook):
'''
hook memory reads and search for instructions like:
cmp [mem], REG
or:
cmp REG, [mem]
where [mem] has the given target value.
tracks the value of REG in `.tainted_values`.
'''
HOOK_TYPE = unicorn.UC_HOOK_MEM_READ
def __init__(self, target):
super(SimpleCmpMemTaintTracker, self).__init__()
# look for comparsions against this target value.
self.target = target
# these are the values compared against our target.
self.tainted_values = set([])
def hook(self, emu, _, address, size, __, ___):
if size != emu.ptr_size:
return
val = emu.arch.parse_ptr(emu, address)
if val != self.target:
return
buf = emu.mem_read(emu.program_counter, 0x10)
insn = next(emu.dis.disasm(bytes(buf), emu.program_counter))
if insn.mnemonic != 'cmp':
return
op0, op1 = insn.operands
if op0.type == capstone.x86_const.X86_OP_REG:
v = emu.reg_read(C2U[op0.reg])
elif op1.type == capstone.x86_const.X86_OP_REG:
v = emu.reg_read(C2U[op1.reg])
self.tainted_values.add(v)
logger.info('%x: tainted value: %08x', emu.program_counter, v)
def compute_available_exports(dlls):
'''
collect all the exports available from the given DLLs loaded at their preferred addresses.
Args:
dlls (List[Dict[str, any]]): list of dicts with keys:
filename (str): filename of DLL.
pe (pefile.PE): parsed DLL.
Returns:
Dict[int, str]: mapping from load address to symbol name.
'''
ret = {}
for dll in dlls:
pe = dll['pe']
image_base = pe.OPTIONAL_HEADER.ImageBase
for symbol in pe.DIRECTORY_ENTRY_EXPORT.symbols:
symbol_addr = image_base + symbol.address
if symbol.name:
# if exported by ordinal only, then ignore
ret[symbol_addr] = symbol.name.decode('ascii')
return ret
def resolve_address(dlls, addr):
'''
resolve the name of the export that prefers to be loaded at the given address.
Args:
dlls (List[Dict[str, any]]): list of dicts with keys:
filename (str): filename of DLL.
pe (pefile.PE): parsed DLL.
addr (int): preferred virtual address of export.
Returns:
str: the name of the export.
Raises:
KeyError: if the export is not found.
'''
for dll in dlls:
pe = dll['pe']
image_base = pe.OPTIONAL_HEADER.ImageBase
for symbol in pe.DIRECTORY_ENTRY_EXPORT.symbols:
if addr != image_base + symbol.address:
continue
return symbol.name.decode('ascii')
raise KeyError(addr)
def extract_imports(emu, sc_addr, sc_len, dlls, resolver_offset):
'''
Emulate the shellcode at the given address with the given DLLs loaded
to resolve API hashes.
Assume the given function accepts a single argument: the pointer-sized hash to resolve.
Assume the given function returns the resolved pointer in EAX.
Args:
emu (ucutils.emu.Emulator): Unicorn emulator instance.
sc_addr (int): address of the start of the shellcode region.
sc_len (int): the size of the shellcode buffer.
dlls (List[Dict[str, any]]): list of dicts with keys:
filename (str): filename of DLL.
pe (pefile.PE): parsed DLL.
resolver_offset (int): relative offset into shellcode region of resolver function.
Returns:
Dict[int, str]: mapping from hash to export name.
'''
imports = {}
def in_shellcode(addr):
return sc_addr <= addr < sc_addr + sc_len
# here's the strategy:
# 1. emulate the resolver with a fake hash, monitoring for comparisons against the hash
# 2. for each tainted hash value,
# re-run the resolver function, and see what function pointer is resolved.
TAINTED_VALUE = 0x69696969
emu.program_counter = sc_addr + resolver_offset
# we just pick some place we know is mapped as code.
# don't intend to actually execute here.
ret_addr = sc_addr
# find tainted hashes.
with ucutils.emu.context(emu):
# arg0: tainted value
emu.push(TAINTED_VALUE)
emu.push(ret_addr)
tt = SimpleCmpRegisterTaintTracker(TAINTED_VALUE)
tt2 = SimpleCmpMemTaintTracker(TAINTED_VALUE)
with ucutils.emu.hook(emu, tt):
with ucutils.emu.hook(emu, tt2):
try:
emu.go(ret_addr) # ret from hash function
except unicorn.UcError as e:
if e.errno == unicorn.UC_ERR_READ_UNMAPPED:
# metasploit will walk right off the end of the loaded modules list
# e.g. c24296214e969566e1cc36995eb184e5 at offset 0x15
#
# resolve_function
# 00000006 60 PUSHAD
# 00000007 89 e5 MOV EBP, ESP
# 00000009 31 c0 XOR EAX, EAX
# 0000000b 64 8b 50 30 MOV EDX, dword ptr FS :[EAX + 0x30]
# 0000000f 8b 52 0c MOV EDX, dword ptr [EDX + 0xc]
# 00000012 8b 52 14 MOV EDX, dword ptr [EDX + 0x14]
# 00000015 8b 72 28 MOV ESI, dword ptr [EDX + 0x28] <<<<<<<<<<<
pass
else:
logger.warning('emulation error: %s, $pc: 0x%x', str(e), emu.program_counter)
tainted_values = tt.tainted_values | tt2.tainted_values
logger.info('identified %d tainted values', len(tainted_values))
exports = compute_available_exports(dlls)
logger.info('identified %d potential symbols', len(exports))
for hash in tainted_values:
logger.debug('attempting to resolve hash: %08x', hash)
try:
with ucutils.emu.context(emu):
# pretend we just CALL'd to the resolver routine with one argument: the potential API hash
emu.push(hash)
emu.push(ret_addr)
emu.program_counter = sc_addr + resolver_offset
# there are two potential outcomes that we want to catch:
# 1. the routine returns the function pointer, so we'll pluck that from eax.
# 2. the routine jumps directly to the resolved function, (this is what metasploit does).
#
# to handle (1), we'll definitely want to stop once the resolver function is complete.
# so, we only emulate to the return value.
# to handle (2), then we ensure all DLLs are mapped read-only (not executable).
# this way, the emulator will except when fetching a non-executable instruction.
# note: in practice, this is much faster than registering a hook (see `BreakOnConditionHook`).
try:
emu.go(ret_addr)
except unicorn.UcError as e:
if e.errno == unicorn.UC_ERR_FETCH_PROT:
# probably in case (2) ...or something is broken,
# so see if we're at the start of an exported symbol.
pfunc = emu.program_counter
else:
# something is wrong here
logger.info('failed to resolve hash %08x', hash)
continue
else:
# probably in case (1),
# so the function pointer is in the return value location (eax)
pfunc = emu.eax
if pfunc in exports:
fname = exports[pfunc]
logger.info('resolved %08x to function 0x%08x (%s)', hash, pfunc, fname)
imports[hash] = fname
else:
logger.info('failed to resolve hash %08x', hash)
continue
except unicorn.UcError as e:
logger.warning('emulation error: %s, failed to resolve hash: %08x', str(e), hash)
continue
return imports
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Automatically extract shellcode hash resolutions.")
parser.add_argument("input", type=str,
help="Path to input file")
parser.add_argument("resolver_offset", type=lambda s: int(s, 0x10),
help="Relative offset to resolver function")
parser.add_argument("dlls", type=str, nargs='+',
help="Paths to DLL files to map")
parser.add_argument("-v", "--verbose", action="store_true",
help="Enable debug logging")
parser.add_argument("-q", "--quiet", action="store_true",
help="Disable all output but errors")
args = parser.parse_args(args=argv)
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
elif args.quiet:
logging.basicConfig(level=logging.ERROR)
logging.getLogger().setLevel(logging.ERROR)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
with open(args.input, 'rb') as f:
sc = f.read()
dlls = []
for dllpath in args.dlls:
pe = pefile.PE(dllpath)
dlls.append({
'filename': os.path.basename(dllpath),
'path': dllpath,
'pe': pe
})
emu = ucutils.emu.Emulator(unicorn.UC_ARCH_X86, unicorn.UC_MODE_32, plat=ucutils.plat.win32)
load(emu, ucutils.CODE_ADDR, sc, dlls)
for hash, func in extract_imports(emu, ucutils.CODE_ADDR, len(sc), dlls, args.resolver_offset).items():
print('%08x: %s' % (hash, func))
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment