Skip to content

Instantly share code, notes, and snippets.

@williballenthin
Last active September 24, 2020 15:39
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save williballenthin/c52c6fe1484cb21ed46980babebec05a to your computer and use it in GitHub Desktop.
Save williballenthin/c52c6fe1484cb21ed46980babebec05a to your computer and use it in GitHub Desktop.
import logging
import binascii
import collections
import pefile
import hexdump
import unicorn
import keystone
import capstone
logger = logging.getLogger(__name__)
IMAGE_FILE_MACHINE_I386 = 0x014c
IMAGE_FILE_MACHINE_AMD64 = 0x8664
def get_text_section(pe):
'''
fetch the text section that contains code.
via: https://bitbucket.org/snippets/Alexander_Hanel/e4b6y/30-second-guide-to-pefile-and-capstone
'''
eop = pe.OPTIONAL_HEADER.AddressOfEntryPoint
code_section = pe.get_section_by_rva(eop)
return code_section
def create_disassembler(bits=32):
'''
create a disassembler using some standard options.
'''
if bits == 32:
dis = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32)
elif bits == 64:
dis = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64)
else:
raise ValueError('unexpected bits value: %d' % (bits))
# required to fetch operand values
dis.detail = True
return dis
def create_assembler(bits=32):
'''
create an assembler using some standard options.
'''
if bits == 32:
return keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_32)
elif bits == 64:
return keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_64)
else:
raise ValueError('unexpected bits value: %d' % (bits))
def create_emulator(bits=32):
'''
create an emulator using some standard options.
'''
if bits == 32:
return unicorn.Uc(unicorn.UC_ARCH_X86, unicorn.UC_MODE_32)
elif bits == 64:
return unicorn.Uc(unicorn.UC_ARCH_X86, unicorn.UC_MODE_64)
else:
raise ValueError('unexpected bits value: %d' % (bits))
def format_instruction(op):
return '%s: %s %s %s' % (hex(op.address),
binascii.hexlify(op.bytes).decode('ascii').ljust(16),
op.mnemonic.ljust(6),
op.op_str)
class FailedToDisassemble(Exception):
pass
MapEntry = collections.namedtuple('MapEntry', ['va', 'size'])
class MemoryInterface(object):
def get_map(self):
'''
fetch the allocated regions within this memory interface.
Returns:
list[MapEntry]: the valid memory regions.
'''
raise NotImplementedError()
def get_bytes(self, va, length):
'''
fetch the bytes at the given address
Arguments:
va (int): virtual address at which to write.
length (int): number of bytes to fetch.
Returns:
bytes: the requested bytes.
Raises:
IndexError: if the requested addresses are out of range.
'''
raise NotImplementedError()
def set_bytes(self, va, bytez):
'''
set the given bytes at the given address
Arguments:
va (int): virtual address at which to write.
bytez (bytes): the data to write.
Returns:
None
'''
raise NotImplementedError()
class PELoader(MemoryInterface):
def __init__(self, pe):
self.pe = pe
self.text = get_text_section(self.pe)
self.base = self.pe.OPTIONAL_HEADER.ImageBase
def get_map(self):
ret = []
for section in self.pe.sections:
rva = section.VirtualAddress
va = self.base + rva
size = section.Misc_VirtualSize
ret.append(MapEntry(va, size))
return ret
def get_bytes(self, va, length):
rva = va - self.base
return self.pe.get_data(rva, length)
def set_bytes(self, va, bytez):
rva = va - self.base
self.pe.set_bytes_at_rva(rva, bytez)
data2 = self.get_bytes(rva, len(bytez))
assert bytez == data2
def get_bits(self):
if self.pe.FILE_HEADER.Machine == IMAGE_FILE_MACHINE_I386:
logger.info('32-bit sample')
return 32
elif self.pe.FILE_HEADER.Machine == IMAGE_FILE_MACHINE_AMD64:
logger.info('64-bit sample')
return 64
else:
raise NotImplementedError('unsupported PE magic')
class ShellcodeLoader(MemoryInterface):
def __init__(self, buf):
self.buf = buf
def get_map(self):
return [
MapEntry(0, len(self.buf))
]
def get_bytes(self, va, length):
data = self.buf[va:va+length]
if len(data) != length:
raise IndexError(hex(va))
return data
def set_bytes(self, va, bytez):
self.buf[va:va+len(bytez)] = bytez
def align(value, alignment):
'''
align the given value.
result will be greater than or equal to the given value.
Args:
value (int): the base value.
alignment (int): the alignment increment.
Returns:
int: the aligned value.
'''
if value % alignment == 0:
return value
return value + (alignment - (value % alignment))
class Analyzer(object):
'''
binary analysis workspace built on pefile, capstone, and keystone.
'''
MAX_INSN_LEN = 0x10
def __init__(self, mem, bits=32):
'''
Arguments:
mem (MemoryInterface): the memory subclass that represents the sample to analyze.
'''
self.mem = mem
self.bits = bits
self.dis = create_disassembler(bits)
self.asm = create_assembler(bits)
def get_map(self):
return self.mem.get_map()
def get_bytes(self, va, length):
return self.mem.get_bytes(va, length)
def set_bytes(self, va, bytez):
return self.mem.set_bytes(va, bytez)
def disassemble(self, va):
'''
disassemble the instruction at the given virtual address.
Arguments:
va (int) the address to disassemble.
Returns:
capstone.CsInsn: the instruction object.
Raises:
IndexError: if the address is not mapped.
'''
data = bytes(self.get_bytes(va, self.MAX_INSN_LEN))
try:
op = next(self.dis.disasm(data, va))
except StopIteration:
raise FailedToDisassemble(hex(va))
return op
def assemble(self, insns, va=0, minlength=0):
'''
assemble the given text into bytes at the given address.
pad to the given minimum length with NOPs (0x90) if provided.
Arguments:
insns (str): the intel assembly instructions.
va (int): the address at which to assemble, default is 0x0.
minlength (int): minimum length of the output bytes, optional.
Returns:
bytes: the raw bytes of the assembled instructions.
'''
encoding, _ = self.asm.asm(insns, va)
while minlength and len(encoding) < minlength:
encoding.append(0x90)
return bytes(encoding)
STACK_ADDRESS = 0x69000000
def get_emulator(self):
emu = create_emulator(self.bits)
for section in self.get_map():
emu.mem_map(section.va, align(section.size, 0x1000))
emu.mem_write(section.va, self.get_bytes(section.va, section.size))
emu.mem_map(self.STACK_ADDRESS, 0x10 * 0x1000)
emu.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.STACK_ADDRESS + 0x8 * 0x1000)
emu.reg_write(unicorn.x86_const.UC_X86_REG_EBP, self.STACK_ADDRESS + 0x8 * 0x1000)
return emu
#!/usr/bin/env python2
'''
usage: chap2fix.py [-h] [-v] [-q] [-o OUTPUT] input
Fix 'fake unconditional' jumps.
positional arguments:
input Path to input file
optional arguments:
-h, --help show this help message and exit
-v, --verbose Enable debug logging
-q, --quiet Disable all output but errors
-o OUTPUT, --patched-file OUTPUT
Destination path to write patched PE file
example::
/env/bin/python chap2fix.py sample -o patched.bin
author: Willi Ballenthin
'''
import sys
import logging
import binascii
from pprint import pprint
from collections import namedtuple
import pefile
import argparse
import capstone.x86_const as x86
import analyzer
logger = logging.getLogger(__name__)
def is_fake_conditional(ana, va):
try:
op1 = ana.disassemble(va)
if not op1.group(x86.X86_GRP_JUMP):
return False
op2 = ana.disassemble(va + op1.size)
if not op2.group(x86.X86_GRP_JUMP):
return False
if op1.operands[0].value.imm != op2.operands[0].value.imm:
return False
mn1 = op1.mnemonic
mn2 = op2.mnemonic
if mn2 < mn1:
mn1, mn2 = mn2, mn1
if mn1.lstrip('jn') == mn2.lstrip('jn'):
return True
# empirically collected set of fake conditional jump pairs,
# in which the suffixes do not match.
if (mn1, mn2) in set([('ja', 'jbe'),
('jg', 'jle'),
('jae', 'jb')]):
return True
logger.debug('unlikely "fake conditional" jump at %s: %s %s (will ignore)', hex(va), mn1, mn2)
return False
except (IndexError, analyzer.FailedToDisassemble):
return False
PatchEntry = namedtuple('PatchEntry', ['address', 'bytes', 'asm'])
def create_fake_conditional_jump_patch(ana, va):
op1 = ana.disassemble(va)
op2 = ana.disassemble(va + op1.size)
target = op1.operands[0].value.imm
asm = 'JMP %s' % (hex(target))
data = ana.assemble(asm, va=va, minlength=op1.size + op2.size)
return PatchEntry(va, data, asm)
def compute_fake_conditional_jump_patches(ana, entrypoints):
'''
generate patches that replace "fake conditional" jumps with unconditional jumps.
Arguments:
ana (analyzer.Analyzer): the analyzer.
entrypoints (List[int]): the initial list of code start points.
Yields:
PatchEntry
'''
entrypoints = set(entrypoints)
seen = set([])
while len(entrypoints) > 0:
entrypoint = entrypoints.pop()
if entrypoint in seen:
continue
seen.add(entrypoint)
logger.debug('find patches starting from %s.', hex(entrypoint))
va = entrypoint
while True:
try:
op = ana.disassemble(va)
except IndexError:
logger.debug('failed to disassemble at %s. analysis dead end.', hex(va))
break
if is_fake_conditional(ana, va):
yield create_fake_conditional_jump_patch(ana, va)
target = op.operands[0].value.imm
va = target
continue
elif op.group(x86.X86_GRP_CALL):
target = op.operands[0].value.imm
entrypoints.add(target)
va += op.size
continue
elif op.group(x86.X86_GRP_JUMP):
if op.mnemonic == 'jmp':
target = op.operands[0].value.imm
va = target
continue
else:
target = op.operands[0].value.imm
entrypoints.add(target)
va += op.size
continue
elif op.group(x86.X86_GRP_RET):
break
else:
va += op.size
continue
def aggressively_compute_fake_conditional_jump_patches(ana, startva):
'''
aggressively generate patches that replace "fake conditional"
jumps with unconditional jumps.
this routine is more aggressive than `compute_fake_conditional_jump_patches`
because it scans every byte for a region to patch, rather than
following control flow.
Arguments:
ana (analyzer.Analyzer): the analyzer.
Yields:
PatchEntry
'''
va = startva
while True:
try:
op = ana.disassemble(va)
except IndexError:
logger.debug('failed to disassemble at %s. analysis dead end.', hex(va))
break
except pefile.PEFormatError:
logger.debug('failed to disassemble at %s. analysis dead end.', hex(va))
break
except analyzer.FailedToDisassemble:
va += 1
continue
if is_fake_conditional(ana, va):
# ignore prefix on jump instructions, if possible.
# for example, should use 0x40450F, not 0x40450E
fva = va + 1
while is_fake_conditional(ana, fva):
logger.debug('ignoring prefix at %s', hex(fva - 1))
fva += 1
va = fva - 1
patch = create_fake_conditional_jump_patch(ana, va)
yield patch
va += len(patch.bytes)
else:
va += 1
def format_patch(patch):
'''
format a patch instance into a line of text.
see `parse_patch`.
Arguments:
patch (PatchEntry): the patch to format.
Returns:
str: the line of text
'''
return '%s %s ; %s' % (hex(patch.address),
binascii.hexlify(patch.bytes).decode('ascii'),
patch.asm)
def parse_patch(line):
'''
parse a line of text into a patch instance.
see `format_patch`.
Arguments;
line (str): a line of text.
Returns:
PatchEntry: the patch.
Raises:
RuntimeError: if the line is invalid.
'''
if ';' not in line:
raise RuntimeError('invalid patch line')
data, _, comment = line.partition(';')
if not data:
raise RuntimeError('invalid patch line')
comment = comment.strip(' ')
data = data.strip(' ')
address, _, bytez = data.partition(' ')
address = int(address, 0x10)
bytez = binascii.unhexlify(bytez.encode('ascii'))
return PatchEntry(address, bytez, comment)
def apply_patch(ana, patch):
'''
apply a patch to the given analyzer.
Arguments:
ana (analyzer.Analyzer): the analyzer.
patch (PatchEntry): the patch.
Returns:
None
'''
ana.set_bytes(patch.address, patch.bytes)
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Fix 'fake unconditional' jumps.")
parser.add_argument("input", type=str,
help="Path to input file")
parser.add_argument("-v", "--verbose", action="store_true",
help="Enable debug logging")
parser.add_argument("-q", "--quiet", action="store_true",
help="Disable all output but errors")
parser.add_argument('-o', "--patched-file", dest='output',
help="Destination path to write patched PE file")
parser.add_argument('-s', "--start-addr", dest='offset', default=0x401910,
type=lambda s: int(s, 0x10),
help="Start address of the obfuscated region.")
parser.add_argument('-x', "--aggressive", action="store_true",
help="Aggressively scan for 'fake unconditional' jumps.")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
elif args.quiet:
logging.basicConfig(level=logging.ERROR)
else:
logging.basicConfig(level=logging.INFO)
pe = pefile.PE(args.input)
mem = analyzer.PELoader(pe)
ana = analyzer.Analyzer(mem, mem.get_bits())
if args.aggressive:
computed_patches = aggressively_compute_fake_conditional_jump_patches(ana, args.offset)
else:
computed_patches = compute_fake_conditional_jump_patches(ana, [args.offset])
patches = []
for patch in computed_patches:
print(format_patch(patch))
patches.append(patch)
if args.output:
for patch in patches:
apply_patch(ana, patch)
pe.write(args.output)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment