Skip to content

Instantly share code, notes, and snippets.

@iamahuman
Last active March 18, 2017 10:28
Show Gist options
  • Save iamahuman/d29a2c3c0b04a326fbf9f8ffaff5043a to your computer and use it in GitHub Desktop.
Save iamahuman/d29a2c3c0b04a326fbf9f8ffaff5043a to your computer and use it in GitHub Desktop.
angr usage
#!/usr/bin/env python
# auth_engine.ko: https://file.io/aHLh4Q (the kernel module)
# dimicraft_auth: https://file.io/iJZQr5 (the userspace program)
# Patches required in order to work:
# cle @ commit 816672e (see cle #24)
# angr @ commit 1fba387 (see angr #126)
import logging
logging.basicConfig(level=logging.WARN)
import sys
import collections
import itertools
import simuvex
import claripy
import angr
import cle
l = logging.getLogger("solve")
l.setLevel(logging.DEBUG)
class MyStorage(simuvex.SimStatePlugin):
_fields = ("read_addr", "write_addr", "ioctl_addr", "open_addr")
def __init__(self, mem=None):
super(MyStorage, self).__init__()
if mem is None:
self.mem = simuvex.SimSymbolicMemory(memory_id="file_ulocal_%s_%d" % (
self.__class__.__name__, id(self),))
else:
self.mem = mem
def set_state(self, *args, **kwargs):
rval = super(MyStorage, self).set_state(*args, **kwargs)
self.mem.set_state(*args, **kwargs)
return rval
def copy(self):
return MyStorage(mem=self.mem.copy())
def merge(self, others, *args, **kwargs):
return self.mem.merge([o.mem for o in others], *args, **kwargs)
def widen(self, others, *args, **kwargs):
return self.mem.widen([o.mem for o in others], *args, **kwargs)
def __getattr__(self, name):
if name.startswith("_"):
return super(MyStorage, self).__getattr__(name)
try:
index = self._fields.index(name)
except ValueError:
return super(MyStorage, self).__getattr__(name)
else:
width = self.state.arch.bytes
return self.mem.load(index * width, width)
def __setattr__(self, name, value):
if name.startswith("_"):
return super(MyStorage, self).__getattr__(name)
try:
index = self._fields.index(name)
except ValueError:
return super(MyStorage, self).__setattr__(name, value)
else:
width = self.state.arch.bytes
return self.mem.store(index * width, value)
class SubPtrArgRet0(simuvex.SimProcedure):
def run(self, arg1):
return 0
SubMemcpy = simuvex.SimProcedures['libc.so.6']['memcpy']
class SubDumbKernUserCopy(simuvex.SimProcedure):
def run(self, dst_addr, src_addr, limit):
l.debug("Copying from %r to %r (at most %d bytes)" % (src_addr, dst_addr, self.state.se.max_int(limit)))
self.inline_call(SubMemcpy, dst_addr, src_addr, limit)
return 0
SubMalloc = simuvex.SimProcedures['libc.so.6']['malloc']
SubMemset = simuvex.SimProcedures['libc.so.6']['memset']
class SubDumbKernCacheAlloc(simuvex.SimProcedure):
def run(self, s, gfpflags, size):
l.debug("Allocating %#x bytes" % self.state.se.max_int(size))
addr = self.inline_call(SubMalloc, size).ret_expr
self.inline_call(SubMemset, addr, 0, size) # zere out
return addr
class SubDumbKernRegChrDev(simuvex.SimProcedure):
def run(self, major, baseminor, count, name, fops):
l.debug("Registering a character device")
# That module's probably registering /dev/dimicraft
if not self.state.se.is_true(self.state.memory.load(name, 10) == "dimicraft\x00", exact=True):
raise RuntimeError("registering an unknown device")
storage = self.state.get_plugin("my_storage")
bsz, endness = self.state.arch.bytes, self.state.arch.memory_endness
storage.read_addr = self.state.memory.load(fops + bsz * 2, bsz, endness=endness)
storage.write_addr = self.state.memory.load(fops + bsz * 3, bsz, endness=endness)
storage.ioctl_addr = self.state.memory.load(fops + bsz * 10, bsz, endness=endness)
storage.open_addr = self.state.memory.load(fops + bsz * 14, bsz, endness=endness)
return 0
class SubGateOpen(simuvex.SimProcedure):
NO_RET = True
local_vars = ('old_sp',)
def run(self, path, flags):
self.old_sp = self.state.regs.sp # backup old sp
path_str = self.state.mem[path].string.resolved
if (self.state.memory.load(path, 15) == b"/dev/dimicraft\x00").is_true():
self.call(self.state.get_plugin("my_storage").open_addr, (0, 0), 'after_open')
else:
self.ret(-1)
def after_open(self, *args, **kwargs):
rval = self.cc.get_return_val(self.state)
self.state.regs.sp = self.old_sp
self.ret(self.state.se.If(rval == 0,
self.state.se.BVV(3, self.state.arch.bits), # success
self.state.se.BVV(-1, self.state.arch.bits))) # failure
class SubHookScanf(simuvex.SimProcedure):
def run(self, fmt, arg):
if self.ret_to is not None:
ret_addr = self.state.se.BVV(self.ret_to, self.state.arch.bits)
else:
ret_addr = self.state.se.exactly_int(self.cc.return_addr.get_value(self.state))
is_lsr = (ret_addr == 0x400a17)
is_key = (ret_addr == 0x400a3c)
self.state.add_constraints(claripy.Or(is_lsr, is_key))
self.state.memory.store(arg, self.state.se.If(is_lsr, full_name,
self.state.se.BVV("You_must_find_your_name_in_here\x00")))
return 1
class SubGateWrite(simuvex.SimProcedure):
NO_RET = True
local_vars = ('old_sp',)
def run(self, fd, ptr, n):
# XXX backing up old stack pointer is a bit ugly - any better way to do this?
self.old_sp = self.state.regs.sp
if fd.symbolic:
l.warn("WTF - fd is symbolic?")
fd = self.state.se.exactly_int(fd)
if fd == 3:
l.debug("userspace write() to auth_engine")
self.call(self.state.get_plugin("my_storage").write_addr, (0, ptr, n, 0), 'after_write')
elif fd < 3:
self.ret(n) # fake write
else:
self.ret(-1) # wtf fd > 3?
def after_write(self, *args, **kwargs):
rval = self.cc.get_return_val(self.state)
self.state.regs.sp = self.old_sp
self.ret(self.state.se.If(rval >= 0, rval, -1))
class SubGateRead(simuvex.SimProcedure):
NO_RET = True
local_vars = ('old_sp',)
def run(self, fd, ptr, n):
self.old_sp = self.state.regs.sp
if fd.symbolic:
l.warn("WTF - fd is symbolic?")
fd = self.state.se.any_int(fd)
if fd == 3:
l.debug("userspace read() to auth_engine")
self.call(self.state.get_plugin("my_storage").read_addr, (0, ptr, n, 0), 'after_write')
else:
self.ret(-1) # wtf fd != 3?
def after_write(self, *args, **kwargs):
rval = self.cc.get_return_val(self.state)
self.state.regs.sp = self.old_sp
self.ret(self.state.se.If(rval >= 0, rval, -1))
class SubGateIoctl(simuvex.SimProcedure):
NO_RET = True
local_vars = ('old_sp',)
def run(self, fd, req, arg):
self.old_sp = self.state.regs.sp
if fd.symbolic:
l.warn("WTF - fd is symbolic?")
fd = self.state.se.any_int(fd)
if fd == 3:
l.debug("userspace ioctl(%#x)" % self.state.se.any_int(req))
self.call(self.state.get_plugin("my_storage").ioctl_addr, (0, req, arg), 'after_ioctl')
else:
self.ret(-1) # wtf fd != 3?
def after_ioctl(self, *args, **kwargs):
self.state.regs.sp = self.old_sp
rval = self.cc.get_return_val(self.state)
self.ret(self.state.se.If(rval >= 0, rval, -1))
def solve():
global full_name
# Set up memory
# Here relocatables are neither shared library nor executable, so we load them manually
loader = cle.Loader("./dimicraft_auth", auto_load_libs=False)
lib_rebase_addr = 0x20000000
lib_bin = loader.load_object("./auth_engine.ko", options=dict(custom_rebase_addr=lib_rebase_addr), is_main_bin=False)
loader.add_object(lib_bin, lib_rebase_addr)
loader.relocate() # necessary for rebasing manually loaded objects
# Set up project
b = angr.Project(loader, use_sim_procedures=False)
# Set up calling
cc = simuvex.DefaultCC[b.arch.name](b.arch)
deadended_addr = b._extern_obj.get_pseudo_addr('_custom_fake_return_addr_')
b.hook(deadended_addr, simuvex.SimProcedures["stubs"]["PathTerminator"])
# Set up symbols
procs_libc_so_6 = simuvex.SimProcedures["libc.so.6"]
procs_stubs = simuvex.SimProcedures["stubs"]
hook_map = (
("puts", SubPtrArgRet0), ("printf", SubPtrArgRet0),
("strcmp", procs_libc_so_6["strcmp"]), ("strcpy", procs_libc_so_6["strcpy"]),
("strlen", procs_libc_so_6["strlen"]), ("memset", procs_libc_so_6["memset"]),
("__isoc99_scanf", SubHookScanf),
("open", SubGateOpen), ("write", SubGateWrite), ("ioctl", SubGateIoctl),
("read", SubGateRead), ("close", procs_stubs["Nop"]),
("__fentry__", procs_stubs["Nop"]),
("mutex_lock", SubPtrArgRet0), ("mutex_unlock", SubPtrArgRet0),
("_copy_from_user", SubDumbKernUserCopy), ("_copy_to_user", SubDumbKernUserCopy),
("kmem_cache_alloc_trace", SubDumbKernCacheAlloc),
('kmalloc_caches', b._extern_obj.get_pseudo_addr("tmp_buf", b.arch.bytes * 16)), # This is enough!
('__register_chrdev', SubDumbKernRegChrDev),)
for sym_name, proc in hook_map:
b.hook_symbol(sym_name, proc)
# Create a state
state = b.factory.blank_state()
state.register_plugin("my_storage", MyStorage()) # used to store local data
# too many branches - unsat states slows down way too much
state.options.remove(simuvex.o.LAZY_SOLVES)
# Set up symbolic input - assume an valid printable ASCII string w/o whitespaces
min_len, max_len = 1, 31
assert 1 <= min_len <= max_len <= 31
the_name = state.se.Unconstrained("the_name", 8 * max_len)
full_name = state.se.Concat(the_name, state.se.BVV(0, 8 * (32 - max_len))) # incl. NUL byte - used for feeding scanf
chrs = the_name.chop(8)
# Ascii chars consistuting mininum length cannot be zero
state.add_constraints(*(claripy.And(ch > 0x20, ch < 0x7f) for ch in chrs[:min_len]))
if min_len < max_len:
# Char @ min_len can or cannot be zero
state.add_constraints(claripy.Or(chrs[min_len] == 0, claripy.And(chrs[min_len] > 0x20, chrs[min_len] < 0x7f)))
# Other chars depend on the previous characters
state.add_constraints(*(claripy.Or(ch == 0, claripy.And(chrs[min_len+1+i-1] != 0, ch > 0x20, ch < 0x7f)) for i, ch in enumerate(chrs[min_len+1:max_len])))
# Initialize module - call init_module()
old_sp = state.regs.sp # backup stack frame pointer
state.ip = lib_bin.get_symbol("init_module").rebased_addr
cc.setup_callsite(state, state.se.BVV(deadended_addr, b.arch.bits), [])
pg = (b.factory.path_group(state)
.step(until=lambda pg: len(pg.active) == 0).unstash(from_stash='deadended')
.prune(filter_func=lambda pt: pt.addr == deadended_addr))
assert len(pg.active) > 0 # There must be at least one path returned from the function
pg = pg.merge()
# check return value - must always be 0
state = pg.active[0].state
assert (cc.get_return_val(state)[31:] == 0).is_true()
state.regs.sp = old_sp # restore stack frame pointer
# call main()
state.ip = 0x400896
cc.setup_callsite(state, state.se.BVV(deadended_addr, b.arch.bits), [state.se.BVV(0, b.arch.bits)] * 3)
pg = b.factory.path_group(state)
def step_func(pg):
# Search for found paths
pg = (pg.stash(lambda p: p.addr == 0x400f3f, to_stash='found')
.stash(lambda p: p.addr in (0x400a5f, 0x400f4b, 0x400f55), to_stash='avoid')
.prune(from_stash='found')
.stash(filter_func=lambda p: 0x400896 <= p.addr <= 0x400f3d)) # poor man's Veritesting technique
if len(pg.active) == 0:
pg = pg.unstash().prune().merge()
return pg
pg = pg.step(step_func=step_func, until=lambda pg: len(pg.active) == 0).merge(stash='found')
for found in pg.found:
l.debug("Searching for input..")
cand_name = found.state.se.any_str(the_name)
if b"\x00" in cand_name: # Strip NULs
cand_name = cand_name[:cand_name.index(b"\x00")]
l.debug("Found! %s" % cand_name)
return cand_name
else:
l.debug("Not found!")
if __name__ == '__main__':
assert solve() == b"1_7H1nk_K3rN3l_m0dul3_15_C0o0l!"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment