Created
March 23, 2015 14:49
-
-
Save TheCjw/50abbd0ad3c16c5dee55 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from idaapi import * | |
from idc import * | |
from idautils import * | |
import sys | |
#Jared DeMott, labs.bromium.com | |
def error(e): | |
print "Fatal error: ", e | |
sys.exit(-1) | |
HeapAlloc_addr = LocByName("__imp__HeapAlloc@12") | |
if HeapAlloc_addr == 0xffffffff: | |
error("bad HeapAlloc_addr") | |
process_heap = LocByName("_g_hProcessHeap") | |
if process_heap == 0xffffffff: | |
error("bad process_heap") | |
GetProcessHeap = LocByName("__imp__GetProcessHeap@0") | |
if GetProcessHeap == 0xffffffff: | |
error("bad GetProcessHeap") | |
patched=0 | |
if patched: | |
isolated_heap = LocByName("_g_hIsolatedHeap") | |
if isolated_heap == 0xffffffff: | |
error("bad isolated_heap") | |
else: | |
isolated_heap = 0x12345678 #hack, if data happens to be present will detect i_heap in unpatched | |
p_heap=0 | |
i_heap=0 | |
d_heap=0 | |
v_heap=0 | |
dynamic_size=0 | |
heap_name = "" | |
DYSIZE = 0 | |
debug = 0 | |
def check_for_first_jmp_to_this_location(addr): | |
ret_addr = 0 | |
for x_ref in XrefsTo(addr, 0): | |
if x_ref.type == 19: #Code_Near_Jump | |
ret_addr = x_ref.frm #only return the first | |
break | |
return ret_addr | |
def get_prior_addr(addr): | |
_addr = check_for_first_jmp_to_this_location(addr) | |
if _addr: | |
inst=DecodePreviousInstruction(_addr) | |
addr=inst.ea | |
else: | |
inst=DecodePreviousInstruction(addr) | |
addr=inst.ea | |
return addr | |
def find_wrapper_size(addr): | |
loop = 0 | |
found_pop = 0 | |
size = 0 | |
if debug: | |
print "start wrapper_size" | |
while True: | |
loop+=1 | |
if loop > 15: | |
dynamic_size = 1 | |
break | |
inst = GetMnem(addr) | |
if debug: | |
print loop, hex(addr), inst | |
if inst == "pop": | |
if GetOpnd(addr, 0) == "eax": | |
found_pop = 1 | |
elif inst == "mov": | |
if GetOpnd(addr, 0) == "eax": | |
size = GetOpnd(addr, 1) | |
break | |
elif inst == "push": | |
if found_pop: | |
size = GetOpnd(addr, 0) | |
break | |
addr = get_prior_addr(addr) | |
return size | |
def find_push(addr, ignore_first_push): | |
loop = 0 | |
if debug: | |
print "start find_push" | |
while True: | |
loop+=1 | |
inst = GetMnem(addr) | |
if debug: | |
print loop, hex(addr), inst | |
if inst != "push": | |
if loop < 22: | |
addr = get_prior_addr(addr) | |
continue | |
else: | |
print "Failed to find next PUSH, assuming ??? size" | |
print "\tAddr: %x" %(addr) | |
addr = 0 # is this ok for flags, needed for size | |
break | |
else: | |
if ignore_first_push: | |
ignore_first_push-=1 | |
addr = get_prior_addr(addr) | |
continue | |
break | |
return addr | |
def find_heap(addr): | |
global heap_name, debug | |
lcopy_addr=addr | |
loop=0 | |
op_num=1 | |
second_time=0 | |
pushes_seen=0 | |
done = 0 | |
if debug: | |
print "start find_heap" | |
while done==0: | |
loop+=1 | |
if loop > 1: #first time just points to 'call heapalloc' | |
Mnem = GetMnem(addr) | |
op_val = GetOperandValue(addr, op_num) | |
if debug: | |
print loop, hex(addr), Mnem, hex(op_val) | |
if Mnem == "push": | |
pushes_seen+=1 | |
if op_val == process_heap: | |
heap_name = "process_heap" | |
done = 1 | |
elif op_val == isolated_heap: | |
heap_name = "isolated_heap" | |
done = 1 | |
if Mnem == "call": | |
if op_val==GetProcessHeap: | |
heap_name = "default_heap" | |
done = 1 | |
else: | |
for ref in XrefsFrom(addr, 0): | |
if ref.to == GetProcessHeap: | |
heap_name = "default_heap" | |
done = 1 | |
if loop < 22: | |
addr = get_prior_addr(addr) | |
else: | |
if second_time == 0: | |
op_num=0 | |
loop=1 #cant be zero else reset addr | |
second_time=1 | |
addr=lcopy_addr | |
else: | |
heap_name = "variable_heap" | |
done = 1 | |
return heap_name, addr, pushes_seen | |
def dynamic_check(addr): | |
global dynamic_size | |
if GetOpType(addr, 0) > 4: | |
dynamic_size = 0 #immediate data | |
else: | |
dynamic_size = 1 #reg or mem ref | |
return dynamic_size | |
def add_to_heap_count(): | |
global i_heap, p_heap, d_heap, v_heap, heap_name | |
if heap_name == "isolated_heap": | |
i_heap+=1 | |
elif heap_name == "process_heap": | |
p_heap+=1 | |
elif heap_name == "default_heap": | |
d_heap+=1 | |
elif heap_name == "variable_heap": | |
v_heap+=1 | |
else: | |
print "shouldn't get here ever" | |
return -1 | |
return 1 | |
def WrapperAlloc_output_data(f, name, key, d): | |
global heap_name, debug | |
if debug: | |
print d[key][0], d[key][1], heap_name, d[key][2], d[key][3] | |
f.write("%s\n" % d[key][0] ) | |
f.write("\t%s\n" % d[key][1] ) | |
f.write("\t%s(%s, %s, " % (name, heap_name, d[key][2]) ) | |
f.write("%s) from %x\n" % (d[key][3], key) ) | |
def output_data(f, key, d): | |
global heap_name, debug | |
if debug: | |
print d[key][0], d[key][1], heap_name, hex(d[key][2]), hex(d[key][3]) | |
f.write("%s\n" % d[key][0] ) | |
f.write("\t%s\n" % d[key][1] ) | |
f.write("\tHeapAlloc(%s, %s, " % (heap_name, GetOpnd(d[key][2], 0) ) ) | |
if size_addr: | |
f.write("%s) from %x\n" % (GetOpnd(d[key][3], 0), key) ) | |
else: | |
f.write("???) from %x\n" % key ) | |
def handle_WrapperAlloc(f, flags, heap, name, funcs): | |
print "handling %s" % name | |
dynamic_size = 0 | |
for x in XrefsTo(LocByName(name), 0): | |
ptr = x.frm | |
try: | |
x = funcs[ptr] | |
except: | |
funcs[ptr] = [] | |
func_name = GetFunctionName(ptr) | |
d_func_name = Demangle(func_name, GetLongPrm(INF_SHORT_DN)) | |
heap_name = heap | |
size = find_wrapper_size(ptr) | |
funcs[ptr].append(func_name) | |
funcs[ptr].append(d_func_name) | |
funcs[ptr].append(flags) | |
funcs[ptr].append(size) | |
if dynamic_size == 0: | |
if DYSIZE == 0: | |
WrapperAlloc_output_data(f, name, ptr, funcs) | |
add_to_heap_count() | |
if dynamic_size == 1: | |
if DYSIZE == 1: | |
WrapperAlloc_output_data(f, name, ptr, funcs) | |
add_to_heap_count() | |
print "Beginning" | |
if DYSIZE: | |
f = open("C:\\Users\\AppSec\\Desktop\\uaf\\dynamic_MSHTML_Allocations.txt", "w") | |
else: | |
f = open("C:\\Users\\AppSec\\Desktop\\uaf\\all_MSHTML_Allocations.txt", "w") | |
funcs = {} | |
for xref in XrefsTo(HeapAlloc_addr, 0): | |
dynamic_size=0 | |
ptr = xref.frm | |
try: | |
x = funcs[ptr] | |
except: | |
funcs[ptr] = [] | |
func_name = GetFunctionName(ptr) | |
d_func_name = Demangle(func_name, GetLongPrm(INF_SHORT_DN)) | |
#try: | |
# if "CCustomCursor::BeginDownload" in d_func_name: | |
# debug = 1 | |
#except: | |
# pass | |
Mnem = GetMnem(ptr) | |
if Mnem != "call": | |
continue | |
#print hex(ptr) | |
heap_name, heap_addr, pushes_seen = find_heap(ptr) | |
ignore_first_push=1 | |
flags_addr= find_push(ptr, ignore_first_push) | |
size_addr = find_push(flags_addr, 1) #always ignore the one we just found | |
dynamic_check(size_addr) | |
funcs[ptr].append(func_name) | |
funcs[ptr].append(d_func_name) | |
funcs[ptr].append(flags_addr) | |
funcs[ptr].append(size_addr) | |
if DYSIZE==1: | |
if dynamic_size==1: | |
output_data(f, ptr, funcs) | |
add_to_heap_count() | |
else: | |
output_data(f, ptr, funcs) | |
add_to_heap_count() | |
if "?ProcessHeapAllocClear@@YGPAXK@Z" == func_name: | |
handle_WrapperAlloc(f, 8, "process_heap", func_name, funcs) | |
if "?ProcessHeapAlloc@@YGPAXK@Z" == func_name: | |
handle_WrapperAlloc(f, 0, "process_heap", func_name, funcs) | |
if "__MemIsolatedAllocClear@4" == func_name: | |
handle_WrapperAlloc(f, 8, "isolated_heap", func_name, funcs) | |
if "__MemIsolatedAlloc@4" == func_name: | |
handle_WrapperAlloc(f, 0, "isolated_heap", func_name, funcs) | |
debug = 0 | |
f.write("\n\nTotal allocations to the default/process heap=%d\n" % (p_heap+d_heap+v_heap) ) | |
f.write("Total allocations to separate isolated heap=%d\n" % i_heap) | |
f.close() | |
print "Done." | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment