Last active
March 18, 2017 10:28
-
-
Save iamahuman/d29a2c3c0b04a326fbf9f8ffaff5043a to your computer and use it in GitHub Desktop.
angr usage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# auth_engine.ko: https://file.io/aHLh4Q (the kernel module) | |
# dimicraft_auth: https://file.io/iJZQr5 (the userspace program) | |
# Patches required in order to work: | |
# cle @ commit 816672e (see cle #24) | |
# angr @ commit 1fba387 (see angr #126) | |
import logging | |
logging.basicConfig(level=logging.WARN) | |
import sys | |
import collections | |
import itertools | |
import simuvex | |
import claripy | |
import angr | |
import cle | |
l = logging.getLogger("solve") | |
l.setLevel(logging.DEBUG) | |
class MyStorage(simuvex.SimStatePlugin): | |
_fields = ("read_addr", "write_addr", "ioctl_addr", "open_addr") | |
def __init__(self, mem=None): | |
super(MyStorage, self).__init__() | |
if mem is None: | |
self.mem = simuvex.SimSymbolicMemory(memory_id="file_ulocal_%s_%d" % ( | |
self.__class__.__name__, id(self),)) | |
else: | |
self.mem = mem | |
def set_state(self, *args, **kwargs): | |
rval = super(MyStorage, self).set_state(*args, **kwargs) | |
self.mem.set_state(*args, **kwargs) | |
return rval | |
def copy(self): | |
return MyStorage(mem=self.mem.copy()) | |
def merge(self, others, *args, **kwargs): | |
return self.mem.merge([o.mem for o in others], *args, **kwargs) | |
def widen(self, others, *args, **kwargs): | |
return self.mem.widen([o.mem for o in others], *args, **kwargs) | |
def __getattr__(self, name): | |
if name.startswith("_"): | |
return super(MyStorage, self).__getattr__(name) | |
try: | |
index = self._fields.index(name) | |
except ValueError: | |
return super(MyStorage, self).__getattr__(name) | |
else: | |
width = self.state.arch.bytes | |
return self.mem.load(index * width, width) | |
def __setattr__(self, name, value): | |
if name.startswith("_"): | |
return super(MyStorage, self).__getattr__(name) | |
try: | |
index = self._fields.index(name) | |
except ValueError: | |
return super(MyStorage, self).__setattr__(name, value) | |
else: | |
width = self.state.arch.bytes | |
return self.mem.store(index * width, value) | |
class SubPtrArgRet0(simuvex.SimProcedure): | |
def run(self, arg1): | |
return 0 | |
SubMemcpy = simuvex.SimProcedures['libc.so.6']['memcpy'] | |
class SubDumbKernUserCopy(simuvex.SimProcedure): | |
def run(self, dst_addr, src_addr, limit): | |
l.debug("Copying from %r to %r (at most %d bytes)" % (src_addr, dst_addr, self.state.se.max_int(limit))) | |
self.inline_call(SubMemcpy, dst_addr, src_addr, limit) | |
return 0 | |
SubMalloc = simuvex.SimProcedures['libc.so.6']['malloc'] | |
SubMemset = simuvex.SimProcedures['libc.so.6']['memset'] | |
class SubDumbKernCacheAlloc(simuvex.SimProcedure): | |
def run(self, s, gfpflags, size): | |
l.debug("Allocating %#x bytes" % self.state.se.max_int(size)) | |
addr = self.inline_call(SubMalloc, size).ret_expr | |
self.inline_call(SubMemset, addr, 0, size) # zere out | |
return addr | |
class SubDumbKernRegChrDev(simuvex.SimProcedure): | |
def run(self, major, baseminor, count, name, fops): | |
l.debug("Registering a character device") | |
# That module's probably registering /dev/dimicraft | |
if not self.state.se.is_true(self.state.memory.load(name, 10) == "dimicraft\x00", exact=True): | |
raise RuntimeError("registering an unknown device") | |
storage = self.state.get_plugin("my_storage") | |
bsz, endness = self.state.arch.bytes, self.state.arch.memory_endness | |
storage.read_addr = self.state.memory.load(fops + bsz * 2, bsz, endness=endness) | |
storage.write_addr = self.state.memory.load(fops + bsz * 3, bsz, endness=endness) | |
storage.ioctl_addr = self.state.memory.load(fops + bsz * 10, bsz, endness=endness) | |
storage.open_addr = self.state.memory.load(fops + bsz * 14, bsz, endness=endness) | |
return 0 | |
class SubGateOpen(simuvex.SimProcedure): | |
NO_RET = True | |
local_vars = ('old_sp',) | |
def run(self, path, flags): | |
self.old_sp = self.state.regs.sp # backup old sp | |
path_str = self.state.mem[path].string.resolved | |
if (self.state.memory.load(path, 15) == b"/dev/dimicraft\x00").is_true(): | |
self.call(self.state.get_plugin("my_storage").open_addr, (0, 0), 'after_open') | |
else: | |
self.ret(-1) | |
def after_open(self, *args, **kwargs): | |
rval = self.cc.get_return_val(self.state) | |
self.state.regs.sp = self.old_sp | |
self.ret(self.state.se.If(rval == 0, | |
self.state.se.BVV(3, self.state.arch.bits), # success | |
self.state.se.BVV(-1, self.state.arch.bits))) # failure | |
class SubHookScanf(simuvex.SimProcedure): | |
def run(self, fmt, arg): | |
if self.ret_to is not None: | |
ret_addr = self.state.se.BVV(self.ret_to, self.state.arch.bits) | |
else: | |
ret_addr = self.state.se.exactly_int(self.cc.return_addr.get_value(self.state)) | |
is_lsr = (ret_addr == 0x400a17) | |
is_key = (ret_addr == 0x400a3c) | |
self.state.add_constraints(claripy.Or(is_lsr, is_key)) | |
self.state.memory.store(arg, self.state.se.If(is_lsr, full_name, | |
self.state.se.BVV("You_must_find_your_name_in_here\x00"))) | |
return 1 | |
class SubGateWrite(simuvex.SimProcedure): | |
NO_RET = True | |
local_vars = ('old_sp',) | |
def run(self, fd, ptr, n): | |
# XXX backing up old stack pointer is a bit ugly - any better way to do this? | |
self.old_sp = self.state.regs.sp | |
if fd.symbolic: | |
l.warn("WTF - fd is symbolic?") | |
fd = self.state.se.exactly_int(fd) | |
if fd == 3: | |
l.debug("userspace write() to auth_engine") | |
self.call(self.state.get_plugin("my_storage").write_addr, (0, ptr, n, 0), 'after_write') | |
elif fd < 3: | |
self.ret(n) # fake write | |
else: | |
self.ret(-1) # wtf fd > 3? | |
def after_write(self, *args, **kwargs): | |
rval = self.cc.get_return_val(self.state) | |
self.state.regs.sp = self.old_sp | |
self.ret(self.state.se.If(rval >= 0, rval, -1)) | |
class SubGateRead(simuvex.SimProcedure): | |
NO_RET = True | |
local_vars = ('old_sp',) | |
def run(self, fd, ptr, n): | |
self.old_sp = self.state.regs.sp | |
if fd.symbolic: | |
l.warn("WTF - fd is symbolic?") | |
fd = self.state.se.any_int(fd) | |
if fd == 3: | |
l.debug("userspace read() to auth_engine") | |
self.call(self.state.get_plugin("my_storage").read_addr, (0, ptr, n, 0), 'after_write') | |
else: | |
self.ret(-1) # wtf fd != 3? | |
def after_write(self, *args, **kwargs): | |
rval = self.cc.get_return_val(self.state) | |
self.state.regs.sp = self.old_sp | |
self.ret(self.state.se.If(rval >= 0, rval, -1)) | |
class SubGateIoctl(simuvex.SimProcedure): | |
NO_RET = True | |
local_vars = ('old_sp',) | |
def run(self, fd, req, arg): | |
self.old_sp = self.state.regs.sp | |
if fd.symbolic: | |
l.warn("WTF - fd is symbolic?") | |
fd = self.state.se.any_int(fd) | |
if fd == 3: | |
l.debug("userspace ioctl(%#x)" % self.state.se.any_int(req)) | |
self.call(self.state.get_plugin("my_storage").ioctl_addr, (0, req, arg), 'after_ioctl') | |
else: | |
self.ret(-1) # wtf fd != 3? | |
def after_ioctl(self, *args, **kwargs): | |
self.state.regs.sp = self.old_sp | |
rval = self.cc.get_return_val(self.state) | |
self.ret(self.state.se.If(rval >= 0, rval, -1)) | |
def solve(): | |
global full_name | |
# Set up memory | |
# Here relocatables are neither shared library nor executable, so we load them manually | |
loader = cle.Loader("./dimicraft_auth", auto_load_libs=False) | |
lib_rebase_addr = 0x20000000 | |
lib_bin = loader.load_object("./auth_engine.ko", options=dict(custom_rebase_addr=lib_rebase_addr), is_main_bin=False) | |
loader.add_object(lib_bin, lib_rebase_addr) | |
loader.relocate() # necessary for rebasing manually loaded objects | |
# Set up project | |
b = angr.Project(loader, use_sim_procedures=False) | |
# Set up calling | |
cc = simuvex.DefaultCC[b.arch.name](b.arch) | |
deadended_addr = b._extern_obj.get_pseudo_addr('_custom_fake_return_addr_') | |
b.hook(deadended_addr, simuvex.SimProcedures["stubs"]["PathTerminator"]) | |
# Set up symbols | |
procs_libc_so_6 = simuvex.SimProcedures["libc.so.6"] | |
procs_stubs = simuvex.SimProcedures["stubs"] | |
hook_map = ( | |
("puts", SubPtrArgRet0), ("printf", SubPtrArgRet0), | |
("strcmp", procs_libc_so_6["strcmp"]), ("strcpy", procs_libc_so_6["strcpy"]), | |
("strlen", procs_libc_so_6["strlen"]), ("memset", procs_libc_so_6["memset"]), | |
("__isoc99_scanf", SubHookScanf), | |
("open", SubGateOpen), ("write", SubGateWrite), ("ioctl", SubGateIoctl), | |
("read", SubGateRead), ("close", procs_stubs["Nop"]), | |
("__fentry__", procs_stubs["Nop"]), | |
("mutex_lock", SubPtrArgRet0), ("mutex_unlock", SubPtrArgRet0), | |
("_copy_from_user", SubDumbKernUserCopy), ("_copy_to_user", SubDumbKernUserCopy), | |
("kmem_cache_alloc_trace", SubDumbKernCacheAlloc), | |
('kmalloc_caches', b._extern_obj.get_pseudo_addr("tmp_buf", b.arch.bytes * 16)), # This is enough! | |
('__register_chrdev', SubDumbKernRegChrDev),) | |
for sym_name, proc in hook_map: | |
b.hook_symbol(sym_name, proc) | |
# Create a state | |
state = b.factory.blank_state() | |
state.register_plugin("my_storage", MyStorage()) # used to store local data | |
# too many branches - unsat states slows down way too much | |
state.options.remove(simuvex.o.LAZY_SOLVES) | |
# Set up symbolic input - assume an valid printable ASCII string w/o whitespaces | |
min_len, max_len = 1, 31 | |
assert 1 <= min_len <= max_len <= 31 | |
the_name = state.se.Unconstrained("the_name", 8 * max_len) | |
full_name = state.se.Concat(the_name, state.se.BVV(0, 8 * (32 - max_len))) # incl. NUL byte - used for feeding scanf | |
chrs = the_name.chop(8) | |
# Ascii chars consistuting mininum length cannot be zero | |
state.add_constraints(*(claripy.And(ch > 0x20, ch < 0x7f) for ch in chrs[:min_len])) | |
if min_len < max_len: | |
# Char @ min_len can or cannot be zero | |
state.add_constraints(claripy.Or(chrs[min_len] == 0, claripy.And(chrs[min_len] > 0x20, chrs[min_len] < 0x7f))) | |
# Other chars depend on the previous characters | |
state.add_constraints(*(claripy.Or(ch == 0, claripy.And(chrs[min_len+1+i-1] != 0, ch > 0x20, ch < 0x7f)) for i, ch in enumerate(chrs[min_len+1:max_len]))) | |
# Initialize module - call init_module() | |
old_sp = state.regs.sp # backup stack frame pointer | |
state.ip = lib_bin.get_symbol("init_module").rebased_addr | |
cc.setup_callsite(state, state.se.BVV(deadended_addr, b.arch.bits), []) | |
pg = (b.factory.path_group(state) | |
.step(until=lambda pg: len(pg.active) == 0).unstash(from_stash='deadended') | |
.prune(filter_func=lambda pt: pt.addr == deadended_addr)) | |
assert len(pg.active) > 0 # There must be at least one path returned from the function | |
pg = pg.merge() | |
# check return value - must always be 0 | |
state = pg.active[0].state | |
assert (cc.get_return_val(state)[31:] == 0).is_true() | |
state.regs.sp = old_sp # restore stack frame pointer | |
# call main() | |
state.ip = 0x400896 | |
cc.setup_callsite(state, state.se.BVV(deadended_addr, b.arch.bits), [state.se.BVV(0, b.arch.bits)] * 3) | |
pg = b.factory.path_group(state) | |
def step_func(pg): | |
# Search for found paths | |
pg = (pg.stash(lambda p: p.addr == 0x400f3f, to_stash='found') | |
.stash(lambda p: p.addr in (0x400a5f, 0x400f4b, 0x400f55), to_stash='avoid') | |
.prune(from_stash='found') | |
.stash(filter_func=lambda p: 0x400896 <= p.addr <= 0x400f3d)) # poor man's Veritesting technique | |
if len(pg.active) == 0: | |
pg = pg.unstash().prune().merge() | |
return pg | |
pg = pg.step(step_func=step_func, until=lambda pg: len(pg.active) == 0).merge(stash='found') | |
for found in pg.found: | |
l.debug("Searching for input..") | |
cand_name = found.state.se.any_str(the_name) | |
if b"\x00" in cand_name: # Strip NULs | |
cand_name = cand_name[:cand_name.index(b"\x00")] | |
l.debug("Found! %s" % cand_name) | |
return cand_name | |
else: | |
l.debug("Not found!") | |
if __name__ == '__main__': | |
assert solve() == b"1_7H1nk_K3rN3l_m0dul3_15_C0o0l!" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment