-
-
Save CaledoniaProject/49c1d06b5bc5fd09fc03047e528d2910 to your computer and use it in GitHub Desktop.
automatically resolve shellcode hashes into symbolic names using emulation, example: https://asciinema.org/a/uxzaceQ20DFYLJ0APL8sDuh0U
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import logging | |
import pefile | |
import ucutils | |
import unicorn | |
import capstone | |
import argparse | |
import ucutils.emu | |
import ucutils.cli | |
import ucutils.plat.win32 | |
import ucutils.plat.win64 | |
logger = logging.getLogger(__name__) | |
# unicorn and capstone are separate projects. | |
# i'm not sure that the register mappings are guaranteed to be consistent. | |
# so we build a mapping that translates capstone <-> unicorn register constants | |
U2C = {} | |
C2U = {} | |
U2S = {} | |
C2S = {} | |
REGS = set([]) | |
for const_name in dir(capstone.x86_const): | |
if not const_name.startswith('X86_REG_'): | |
continue | |
uconst_name = 'UC_' + const_name | |
reg_name = const_name[len('X86_REG_'):].lower() | |
uconst = getattr(unicorn.x86_const, uconst_name) | |
cconst = getattr(capstone.x86_const, const_name) | |
U2C[uconst] = cconst | |
C2U[cconst] = uconst | |
U2S[uconst] = reg_name | |
C2S[cconst] = reg_name | |
REGS.add(reg_name) | |
def load(emu, sc_addr, sc, dlls): | |
''' | |
load the shellcode at the given address, and map in the given DLLs. | |
maps the following: | |
- instructions | |
- TEB, PEB, and LDR_DATA | |
- stack | |
- each DLL | |
''' | |
logger.debug('mapping instructions at 0x%x', sc_addr) | |
emu.mem.map_data(sc_addr, sc, reason='code') | |
# stack layout: | |
# | |
# min-addr -> STACK_ADDR | |
# $sp ------> STACK_ADDR + 0x1000 | |
# $bp ------> STACK_ADDR + 0x2000 | |
# max-addr -> STACK_ADDR + 0x3000 | |
logger.debug('mapping stack at 0x%x', ucutils.STACK_ADDR) | |
emu.mem.map_region(ucutils.STACK_ADDR, ucutils.STACK_SIZE, reason='stack') | |
emu.stack_pointer = ucutils.STACK_ADDR + 0x1000 | |
emu.base_pointer = ucutils.STACK_ADDR + 0x2000 | |
emu.plat.map_teb() | |
for dll in dlls: | |
emu.plat.load_dll(dll) | |
return sc_addr | |
class SimpleCmpRegisterTaintTracker(ucutils.emu.Hook): | |
''' | |
hook emulation and search for instructions like: | |
cmp REG, REG | |
where one of REG has the given target value. | |
tracks the other "tainted" value in `.tainted_values`. | |
''' | |
HOOK_TYPE = unicorn.UC_HOOK_CODE | |
def __init__(self, target): | |
super(SimpleCmpRegisterTaintTracker, self).__init__() | |
# look for comparsions against this target value. | |
self.target = target | |
# these are the values compared against our target. | |
self.tainted_values = set([]) | |
def hook(self, emu, address, size, user_data): | |
buf = emu.mem_read(address, size) | |
insn = next(emu.dis.disasm(bytes(buf), address)) | |
if insn.mnemonic != 'cmp': | |
return | |
op0, op1 = insn.operands | |
if op0.type != capstone.x86_const.X86_OP_REG: | |
return | |
if op1.type != capstone.x86_const.X86_OP_REG: | |
return | |
val0 = emu.reg_read(C2U[op0.reg]) | |
val1 = emu.reg_read(C2U[op1.reg]) | |
if val0 != self.target and val1 != self.target: | |
return | |
logger.debug('0x%x: tainted comparison: cmp %s=0x%x, %s=0x%x', | |
address, | |
C2S[op0.reg], | |
val0, | |
C2S[op1.reg], | |
val1) | |
self.tainted_values.add(val0) | |
self.tainted_values.add(val1) | |
class SimpleCmpMemTaintTracker(ucutils.emu.Hook): | |
''' | |
hook memory reads and search for instructions like: | |
cmp [mem], REG | |
or: | |
cmp REG, [mem] | |
where [mem] has the given target value. | |
tracks the value of REG in `.tainted_values`. | |
''' | |
HOOK_TYPE = unicorn.UC_HOOK_MEM_READ | |
def __init__(self, target): | |
super(SimpleCmpMemTaintTracker, self).__init__() | |
# look for comparsions against this target value. | |
self.target = target | |
# these are the values compared against our target. | |
self.tainted_values = set([]) | |
def hook(self, emu, _, address, size, __, ___): | |
if size != emu.ptr_size: | |
return | |
val = emu.arch.parse_ptr(emu, address) | |
if val != self.target: | |
return | |
buf = emu.mem_read(emu.program_counter, 0x10) | |
insn = next(emu.dis.disasm(bytes(buf), emu.program_counter)) | |
if insn.mnemonic != 'cmp': | |
return | |
op0, op1 = insn.operands | |
if op0.type == capstone.x86_const.X86_OP_REG: | |
v = emu.reg_read(C2U[op0.reg]) | |
elif op1.type == capstone.x86_const.X86_OP_REG: | |
v = emu.reg_read(C2U[op1.reg]) | |
self.tainted_values.add(v) | |
logger.info('%x: tainted value: %08x', emu.program_counter, v) | |
def resolve_address(dlls, addr): | |
''' | |
resolve the name of the export that prefers to be loaded at the given address. | |
Args: | |
dlls (List[Dict[str, any]]): list of dicts with keys: | |
filename (str): filename of DLL. | |
pe (pefile.PE): parsed DLL. | |
addr (int): preferred virtual address of export. | |
Returns: | |
str: the name of the export. | |
Raises: | |
KeyError: if the export is not found. | |
''' | |
for dll in dlls: | |
pe = dll['pe'] | |
image_base = pe.OPTIONAL_HEADER.ImageBase | |
for symbol in pe.DIRECTORY_ENTRY_EXPORT.symbols: | |
if addr != image_base + symbol.address: | |
continue | |
return symbol.name.decode('ascii') | |
raise KeyError(addr) | |
def extract_imports(emu, sc_addr, dlls, resolver_offset): | |
''' | |
Emulate the shellcode at the given address with the given DLLs loaded | |
to resolve API hashes. | |
Assume the given function accepts a single argument: the pointer-sized hash to resolve. | |
Assume the given function returns the resolved pointer in EAX. | |
Args: | |
emu (ucutils.emu.Emulator): Unicorn emulator instance. | |
sc_addr (int): address of the start of the shellcode region. | |
dlls (List[Dict[str, any]]): list of dicts with keys: | |
filename (str): filename of DLL. | |
pe (pefile.PE): parsed DLL. | |
resolver_offset (int): relative offset into shellcode region of resolver function. | |
Returns: | |
Dict[int, str]: mapping from hash to export name. | |
''' | |
imports = {} | |
# to help with debugging: | |
#cl = ucutils.emu.CodeLogger(emu.arch.get_capstone()) | |
#cl.install(emu) | |
try: | |
# here's the strategy: | |
# 1. emulate the resolver with a fake hash, monitoring for comparisons against the hash | |
# 2. for each tainted hash value, | |
# re-run the resolver function, and see what function pointer is resolved. | |
TAINTED_VALUE = 0x69696969 | |
emu.program_counter = sc_addr + resolver_offset | |
# we just pick some place we know is mapped as code. | |
# don't intend to actually execute here. | |
ret_addr = sc_addr | |
# to help with debugging: | |
#emu.push(TAINTED_VALUE) | |
#emu.push(ret_addr) | |
#cli = ucutils.cli.UnicornCli(emu) | |
#cli.cmdloop() | |
# find tainted hashes. | |
with ucutils.emu.context(emu): | |
# arg0: tainted value | |
emu.push(TAINTED_VALUE) | |
emu.push(ret_addr) | |
tt = SimpleCmpRegisterTaintTracker(TAINTED_VALUE) | |
tt2 = SimpleCmpMemTaintTracker(TAINTED_VALUE) | |
with ucutils.emu.hook(emu, tt): | |
with ucutils.emu.hook(emu, tt2): | |
emu.go(ret_addr) # ret from hash function | |
tainted_values = tt.tainted_values | tt2.tainted_values | |
logger.info('identified %d tainted values', len(tainted_values)) | |
# resolve tainted hashes | |
for hash in tainted_values: | |
logger.debug('attempting to resolve hash: %08x', hash) | |
with ucutils.emu.context(emu): | |
# arg0: potential API hash | |
emu.push(hash) | |
emu.push(ret_addr) | |
emu.go(ret_addr) | |
pfunc = emu.eax | |
fname = resolve_address(dlls, pfunc) | |
imports[hash] = fname | |
logger.info('resolved %08x to function 0x%08x (%s)', hash, pfunc, fname) | |
except unicorn.UcError as e: | |
logger.debug('emulation error: %s', str(e)) | |
cli = ucutils.cli.UnicornCli(emu) | |
cli.cmdloop() | |
return imports | |
def main(argv=None): | |
if argv is None: | |
argv = sys.argv[1:] | |
parser = argparse.ArgumentParser(description="Automatically extract shellcode hash resolutions.") | |
parser.add_argument("input", type=str, | |
help="Path to input file") | |
parser.add_argument("resolver_offset", type=lambda s: int(s, 0x10), | |
help="Relative offset to resolver function") | |
parser.add_argument("dlls", type=str, nargs='+', | |
help="Paths to DLL files to map") | |
parser.add_argument("-v", "--verbose", action="store_true", | |
help="Enable debug logging") | |
parser.add_argument("-q", "--quiet", action="store_true", | |
help="Disable all output but errors") | |
args = parser.parse_args(args=argv) | |
if args.verbose: | |
logging.basicConfig(level=logging.DEBUG) | |
logging.getLogger().setLevel(logging.DEBUG) | |
elif args.quiet: | |
logging.basicConfig(level=logging.ERROR) | |
logging.getLogger().setLevel(logging.ERROR) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
logging.getLogger().setLevel(logging.INFO) | |
with open(args.input, 'rb') as f: | |
sc = f.read() | |
dlls = [] | |
for dllpath in args.dlls: | |
pe = pefile.PE(dllpath) | |
dlls.append({ | |
'filename': os.path.basename(dllpath), | |
'path': dllpath, | |
'pe': pe | |
}) | |
emu = ucutils.emu.Emulator(unicorn.UC_ARCH_X86, unicorn.UC_MODE_32, plat=ucutils.plat.win32) | |
load(emu, ucutils.CODE_ADDR, sc, dlls) | |
for hash, func in extract_imports(emu, ucutils.CODE_ADDR, dlls, args.resolver_offset).items(): | |
print('%x: %s' % (hash, func)) | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment