Last active
September 24, 2020 15:39
-
-
Save williballenthin/c52c6fe1484cb21ed46980babebec05a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import binascii | |
import collections | |
import pefile | |
import hexdump | |
import unicorn | |
import keystone | |
import capstone | |
logger = logging.getLogger(__name__) | |
IMAGE_FILE_MACHINE_I386 = 0x014c | |
IMAGE_FILE_MACHINE_AMD64 = 0x8664 | |
def get_text_section(pe): | |
''' | |
fetch the text section that contains code. | |
via: https://bitbucket.org/snippets/Alexander_Hanel/e4b6y/30-second-guide-to-pefile-and-capstone | |
''' | |
eop = pe.OPTIONAL_HEADER.AddressOfEntryPoint | |
code_section = pe.get_section_by_rva(eop) | |
return code_section | |
def create_disassembler(bits=32): | |
''' | |
create a disassembler using some standard options. | |
''' | |
if bits == 32: | |
dis = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32) | |
elif bits == 64: | |
dis = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64) | |
else: | |
raise ValueError('unexpected bits value: %d' % (bits)) | |
# required to fetch operand values | |
dis.detail = True | |
return dis | |
def create_assembler(bits=32): | |
''' | |
create an assembler using some standard options. | |
''' | |
if bits == 32: | |
return keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_32) | |
elif bits == 64: | |
return keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_64) | |
else: | |
raise ValueError('unexpected bits value: %d' % (bits)) | |
def create_emulator(bits=32): | |
''' | |
create an emulator using some standard options. | |
''' | |
if bits == 32: | |
return unicorn.Uc(unicorn.UC_ARCH_X86, unicorn.UC_MODE_32) | |
elif bits == 64: | |
return unicorn.Uc(unicorn.UC_ARCH_X86, unicorn.UC_MODE_64) | |
else: | |
raise ValueError('unexpected bits value: %d' % (bits)) | |
def format_instruction(op): | |
return '%s: %s %s %s' % (hex(op.address), | |
binascii.hexlify(op.bytes).decode('ascii').ljust(16), | |
op.mnemonic.ljust(6), | |
op.op_str) | |
class FailedToDisassemble(Exception): | |
pass | |
MapEntry = collections.namedtuple('MapEntry', ['va', 'size']) | |
class MemoryInterface(object): | |
def get_map(self): | |
''' | |
fetch the allocated regions within this memory interface. | |
Returns: | |
list[MapEntry]: the valid memory regions. | |
''' | |
raise NotImplementedError() | |
def get_bytes(self, va, length): | |
''' | |
fetch the bytes at the given address | |
Arguments: | |
va (int): virtual address at which to write. | |
length (int): number of bytes to fetch. | |
Returns: | |
bytes: the requested bytes. | |
Raises: | |
IndexError: if the requested addresses are out of range. | |
''' | |
raise NotImplementedError() | |
def set_bytes(self, va, bytez): | |
''' | |
set the given bytes at the given address | |
Arguments: | |
va (int): virtual address at which to write. | |
bytez (bytes): the data to write. | |
Returns: | |
None | |
''' | |
raise NotImplementedError() | |
class PELoader(MemoryInterface): | |
def __init__(self, pe): | |
self.pe = pe | |
self.text = get_text_section(self.pe) | |
self.base = self.pe.OPTIONAL_HEADER.ImageBase | |
def get_map(self): | |
ret = [] | |
for section in self.pe.sections: | |
rva = section.VirtualAddress | |
va = self.base + rva | |
size = section.Misc_VirtualSize | |
ret.append(MapEntry(va, size)) | |
return ret | |
def get_bytes(self, va, length): | |
rva = va - self.base | |
return self.pe.get_data(rva, length) | |
def set_bytes(self, va, bytez): | |
rva = va - self.base | |
self.pe.set_bytes_at_rva(rva, bytez) | |
data2 = self.get_bytes(rva, len(bytez)) | |
assert bytez == data2 | |
def get_bits(self): | |
if self.pe.FILE_HEADER.Machine == IMAGE_FILE_MACHINE_I386: | |
logger.info('32-bit sample') | |
return 32 | |
elif self.pe.FILE_HEADER.Machine == IMAGE_FILE_MACHINE_AMD64: | |
logger.info('64-bit sample') | |
return 64 | |
else: | |
raise NotImplementedError('unsupported PE magic') | |
class ShellcodeLoader(MemoryInterface): | |
def __init__(self, buf): | |
self.buf = buf | |
def get_map(self): | |
return [ | |
MapEntry(0, len(self.buf)) | |
] | |
def get_bytes(self, va, length): | |
data = self.buf[va:va+length] | |
if len(data) != length: | |
raise IndexError(hex(va)) | |
return data | |
def set_bytes(self, va, bytez): | |
self.buf[va:va+len(bytez)] = bytez | |
def align(value, alignment): | |
''' | |
align the given value. | |
result will be greater than or equal to the given value. | |
Args: | |
value (int): the base value. | |
alignment (int): the alignment increment. | |
Returns: | |
int: the aligned value. | |
''' | |
if value % alignment == 0: | |
return value | |
return value + (alignment - (value % alignment)) | |
class Analyzer(object): | |
''' | |
binary analysis workspace built on pefile, capstone, and keystone. | |
''' | |
MAX_INSN_LEN = 0x10 | |
def __init__(self, mem, bits=32): | |
''' | |
Arguments: | |
mem (MemoryInterface): the memory subclass that represents the sample to analyze. | |
''' | |
self.mem = mem | |
self.bits = bits | |
self.dis = create_disassembler(bits) | |
self.asm = create_assembler(bits) | |
def get_map(self): | |
return self.mem.get_map() | |
def get_bytes(self, va, length): | |
return self.mem.get_bytes(va, length) | |
def set_bytes(self, va, bytez): | |
return self.mem.set_bytes(va, bytez) | |
def disassemble(self, va): | |
''' | |
disassemble the instruction at the given virtual address. | |
Arguments: | |
va (int) the address to disassemble. | |
Returns: | |
capstone.CsInsn: the instruction object. | |
Raises: | |
IndexError: if the address is not mapped. | |
''' | |
data = bytes(self.get_bytes(va, self.MAX_INSN_LEN)) | |
try: | |
op = next(self.dis.disasm(data, va)) | |
except StopIteration: | |
raise FailedToDisassemble(hex(va)) | |
return op | |
def assemble(self, insns, va=0, minlength=0): | |
''' | |
assemble the given text into bytes at the given address. | |
pad to the given minimum length with NOPs (0x90) if provided. | |
Arguments: | |
insns (str): the intel assembly instructions. | |
va (int): the address at which to assemble, default is 0x0. | |
minlength (int): minimum length of the output bytes, optional. | |
Returns: | |
bytes: the raw bytes of the assembled instructions. | |
''' | |
encoding, _ = self.asm.asm(insns, va) | |
while minlength and len(encoding) < minlength: | |
encoding.append(0x90) | |
return bytes(encoding) | |
STACK_ADDRESS = 0x69000000 | |
def get_emulator(self): | |
emu = create_emulator(self.bits) | |
for section in self.get_map(): | |
emu.mem_map(section.va, align(section.size, 0x1000)) | |
emu.mem_write(section.va, self.get_bytes(section.va, section.size)) | |
emu.mem_map(self.STACK_ADDRESS, 0x10 * 0x1000) | |
emu.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.STACK_ADDRESS + 0x8 * 0x1000) | |
emu.reg_write(unicorn.x86_const.UC_X86_REG_EBP, self.STACK_ADDRESS + 0x8 * 0x1000) | |
return emu |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
''' | |
usage: chap2fix.py [-h] [-v] [-q] [-o OUTPUT] input | |
Fix 'fake unconditional' jumps. | |
positional arguments: | |
input Path to input file | |
optional arguments: | |
-h, --help show this help message and exit | |
-v, --verbose Enable debug logging | |
-q, --quiet Disable all output but errors | |
-o OUTPUT, --patched-file OUTPUT | |
Destination path to write patched PE file | |
example:: | |
/env/bin/python chap2fix.py sample -o patched.bin | |
author: Willi Ballenthin | |
''' | |
import sys | |
import logging | |
import binascii | |
from pprint import pprint | |
from collections import namedtuple | |
import pefile | |
import argparse | |
import capstone.x86_const as x86 | |
import analyzer | |
logger = logging.getLogger(__name__) | |
def is_fake_conditional(ana, va): | |
try: | |
op1 = ana.disassemble(va) | |
if not op1.group(x86.X86_GRP_JUMP): | |
return False | |
op2 = ana.disassemble(va + op1.size) | |
if not op2.group(x86.X86_GRP_JUMP): | |
return False | |
if op1.operands[0].value.imm != op2.operands[0].value.imm: | |
return False | |
mn1 = op1.mnemonic | |
mn2 = op2.mnemonic | |
if mn2 < mn1: | |
mn1, mn2 = mn2, mn1 | |
if mn1.lstrip('jn') == mn2.lstrip('jn'): | |
return True | |
# empirically collected set of fake conditional jump pairs, | |
# in which the suffixes do not match. | |
if (mn1, mn2) in set([('ja', 'jbe'), | |
('jg', 'jle'), | |
('jae', 'jb')]): | |
return True | |
logger.debug('unlikely "fake conditional" jump at %s: %s %s (will ignore)', hex(va), mn1, mn2) | |
return False | |
except (IndexError, analyzer.FailedToDisassemble): | |
return False | |
PatchEntry = namedtuple('PatchEntry', ['address', 'bytes', 'asm']) | |
def create_fake_conditional_jump_patch(ana, va): | |
op1 = ana.disassemble(va) | |
op2 = ana.disassemble(va + op1.size) | |
target = op1.operands[0].value.imm | |
asm = 'JMP %s' % (hex(target)) | |
data = ana.assemble(asm, va=va, minlength=op1.size + op2.size) | |
return PatchEntry(va, data, asm) | |
def compute_fake_conditional_jump_patches(ana, entrypoints): | |
''' | |
generate patches that replace "fake conditional" jumps with unconditional jumps. | |
Arguments: | |
ana (analyzer.Analyzer): the analyzer. | |
entrypoints (List[int]): the initial list of code start points. | |
Yields: | |
PatchEntry | |
''' | |
entrypoints = set(entrypoints) | |
seen = set([]) | |
while len(entrypoints) > 0: | |
entrypoint = entrypoints.pop() | |
if entrypoint in seen: | |
continue | |
seen.add(entrypoint) | |
logger.debug('find patches starting from %s.', hex(entrypoint)) | |
va = entrypoint | |
while True: | |
try: | |
op = ana.disassemble(va) | |
except IndexError: | |
logger.debug('failed to disassemble at %s. analysis dead end.', hex(va)) | |
break | |
if is_fake_conditional(ana, va): | |
yield create_fake_conditional_jump_patch(ana, va) | |
target = op.operands[0].value.imm | |
va = target | |
continue | |
elif op.group(x86.X86_GRP_CALL): | |
target = op.operands[0].value.imm | |
entrypoints.add(target) | |
va += op.size | |
continue | |
elif op.group(x86.X86_GRP_JUMP): | |
if op.mnemonic == 'jmp': | |
target = op.operands[0].value.imm | |
va = target | |
continue | |
else: | |
target = op.operands[0].value.imm | |
entrypoints.add(target) | |
va += op.size | |
continue | |
elif op.group(x86.X86_GRP_RET): | |
break | |
else: | |
va += op.size | |
continue | |
def aggressively_compute_fake_conditional_jump_patches(ana, startva): | |
''' | |
aggressively generate patches that replace "fake conditional" | |
jumps with unconditional jumps. | |
this routine is more aggressive than `compute_fake_conditional_jump_patches` | |
because it scans every byte for a region to patch, rather than | |
following control flow. | |
Arguments: | |
ana (analyzer.Analyzer): the analyzer. | |
Yields: | |
PatchEntry | |
''' | |
va = startva | |
while True: | |
try: | |
op = ana.disassemble(va) | |
except IndexError: | |
logger.debug('failed to disassemble at %s. analysis dead end.', hex(va)) | |
break | |
except pefile.PEFormatError: | |
logger.debug('failed to disassemble at %s. analysis dead end.', hex(va)) | |
break | |
except analyzer.FailedToDisassemble: | |
va += 1 | |
continue | |
if is_fake_conditional(ana, va): | |
# ignore prefix on jump instructions, if possible. | |
# for example, should use 0x40450F, not 0x40450E | |
fva = va + 1 | |
while is_fake_conditional(ana, fva): | |
logger.debug('ignoring prefix at %s', hex(fva - 1)) | |
fva += 1 | |
va = fva - 1 | |
patch = create_fake_conditional_jump_patch(ana, va) | |
yield patch | |
va += len(patch.bytes) | |
else: | |
va += 1 | |
def format_patch(patch): | |
''' | |
format a patch instance into a line of text. | |
see `parse_patch`. | |
Arguments: | |
patch (PatchEntry): the patch to format. | |
Returns: | |
str: the line of text | |
''' | |
return '%s %s ; %s' % (hex(patch.address), | |
binascii.hexlify(patch.bytes).decode('ascii'), | |
patch.asm) | |
def parse_patch(line): | |
''' | |
parse a line of text into a patch instance. | |
see `format_patch`. | |
Arguments; | |
line (str): a line of text. | |
Returns: | |
PatchEntry: the patch. | |
Raises: | |
RuntimeError: if the line is invalid. | |
''' | |
if ';' not in line: | |
raise RuntimeError('invalid patch line') | |
data, _, comment = line.partition(';') | |
if not data: | |
raise RuntimeError('invalid patch line') | |
comment = comment.strip(' ') | |
data = data.strip(' ') | |
address, _, bytez = data.partition(' ') | |
address = int(address, 0x10) | |
bytez = binascii.unhexlify(bytez.encode('ascii')) | |
return PatchEntry(address, bytez, comment) | |
def apply_patch(ana, patch): | |
''' | |
apply a patch to the given analyzer. | |
Arguments: | |
ana (analyzer.Analyzer): the analyzer. | |
patch (PatchEntry): the patch. | |
Returns: | |
None | |
''' | |
ana.set_bytes(patch.address, patch.bytes) | |
def main(argv=None): | |
if argv is None: | |
argv = sys.argv[1:] | |
parser = argparse.ArgumentParser(description="Fix 'fake unconditional' jumps.") | |
parser.add_argument("input", type=str, | |
help="Path to input file") | |
parser.add_argument("-v", "--verbose", action="store_true", | |
help="Enable debug logging") | |
parser.add_argument("-q", "--quiet", action="store_true", | |
help="Disable all output but errors") | |
parser.add_argument('-o', "--patched-file", dest='output', | |
help="Destination path to write patched PE file") | |
parser.add_argument('-s', "--start-addr", dest='offset', default=0x401910, | |
type=lambda s: int(s, 0x10), | |
help="Start address of the obfuscated region.") | |
parser.add_argument('-x', "--aggressive", action="store_true", | |
help="Aggressively scan for 'fake unconditional' jumps.") | |
args = parser.parse_args() | |
if args.verbose: | |
logging.basicConfig(level=logging.DEBUG) | |
elif args.quiet: | |
logging.basicConfig(level=logging.ERROR) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
pe = pefile.PE(args.input) | |
mem = analyzer.PELoader(pe) | |
ana = analyzer.Analyzer(mem, mem.get_bits()) | |
if args.aggressive: | |
computed_patches = aggressively_compute_fake_conditional_jump_patches(ana, args.offset) | |
else: | |
computed_patches = compute_fake_conditional_jump_patches(ana, [args.offset]) | |
patches = [] | |
for patch in computed_patches: | |
print(format_patch(patch)) | |
patches.append(patch) | |
if args.output: | |
for patch in patches: | |
apply_patch(ana, patch) | |
pe.write(args.output) | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment