Skip to content

Instantly share code, notes, and snippets.

@tin-z
Created May 9, 2021 19:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tin-z/c8920a66a8791ea8f7d54f4304c65656 to your computer and use it in GitHub Desktop.
Save tin-z/c8920a66a8791ea8f7d54f4304c65656 to your computer and use it in GitHub Desktop.
import sys, os
import ctypes
import math
import struct
#### Ignore ####
EC = '\x1b[0m'
BOLD = '\x1b[1m'
INIT = {'f':30,'b':40,'hf':90,'hb':100}
COLORS = ("BLACK",0),("RED",1),("GREEN",2),("YELLOW",3),("BLUE",4),("CYAN",6)
for x,y in COLORS :
globals()[x] = {k:"\x1b[{}m".format(v+y) for k,v in INIT.items()}
FAIL = lambda x : BOLD + RED['f'] + x + EC
WARNING = lambda x : BOLD + YELLOW['b'] + BLACK['f'] + x + EC
PTR = {"H":lambda x : BLUE['f'] + x + EC, "S": lambda x : YELLOW['hf'] + x + EC, "L" : lambda x : RED['f'] + x + EC }
PTR.update( { "HEAP":PTR["H"], "STACK":PTR["S"], "LIBC":PTR["L"] } )
PTR.update( { "H1":lambda x : BLUE['hf'] + x + EC, "H2":lambda x : CYAN['f'] + x + EC, "H3":lambda x : CYAN['hf'] + x + EC })
PTR.update( { "F1":lambda x : BOLD + RED['f'] + BLACK['b'] + "|{}|".format(x) + EC } )
PTR.update( { "F2":lambda x : BOLD + GREEN['f'] + BLACK['b'] + "|{}|".format(x) + EC } )
PTRACE_PEEKTEXT = 1
PTRACE_PEEKDATA = 2
PTRACE_POKETEXT = 4
PTRACE_POKEDATA = 5
PTRACE_GETREGS = 12
PTRACE_SETREGS = 13
PTRACE_ATTACH = 16
PTRACE_DETACH = 17
DESCR = """
scheappes - A simple tool that prints out the status of the memory heap of a process in execution
Some inspiration was taken from here:https://github.com/ancat/gremlin/blob/master/heap_scan.py
"""
#### UTILS ####
def checkZ(conds, msg):
"""
checkZ(lambda, str) -> None
If the lambda function returns True, then print(msg) and sys.exit, otherwise return to caller
"""
if conds():
print(FAIL(msg))
sys.exit(1)
def fmtR(num, data_size):
if data_size == 8 :
return hex(num).rjust(data_size*2 - 2, " ")
return hex(num).rjust(data_size*2 + 2, " ")
def fmtL(num, data_size):
return hex(num).ljust(data_size, " ")
def ent(data, unit='natural'):
"""
ent(b'', unit='natural')
Return entropy value, unit can be 'natural' or 'shannon' value
"""
base = { 'shannon' :2., 'natural':math.exp(1) }
counts = dict()
rets = .0
if len(data) <= 1:
return rets
for x in data:
counts.update({x:counts[x]+1}) if x in counts else counts.update({x:0})
distrib = [float(x) / len(data) for x in counts.values()]
for p in distrib:
if p > .0 :
rets -= p * math.log(p, base[unit])
return rets
#### Ctypes.Structure ####
class iovec(ctypes.Structure):
_fields_ = [
("iov_base", ctypes.c_void_p),
("iov_len", ctypes.c_ulong)
]
class user_regs_structX64(ctypes.Structure):
"""
grep "struct user_regs_struct" /usr/include/sys/user.h
"""
_fields_ = [
("r15", ctypes.c_ulonglong),
("r14", ctypes.c_ulonglong),
("r13", ctypes.c_ulonglong),
("r12", ctypes.c_ulonglong),
("rbp", ctypes.c_ulonglong),
("rbx", ctypes.c_ulonglong),
("r11", ctypes.c_ulonglong),
("r10", ctypes.c_ulonglong),
("r9", ctypes.c_ulonglong),
("r8", ctypes.c_ulonglong),
("rax", ctypes.c_ulonglong),
("rcx", ctypes.c_ulonglong),
("rdx", ctypes.c_ulonglong),
("rsi", ctypes.c_ulonglong),
("rdi", ctypes.c_ulonglong),
("orig_rax", ctypes.c_ulonglong),
("rip", ctypes.c_ulonglong),
("cs", ctypes.c_ulonglong),
("eflags", ctypes.c_ulonglong),
("rsp", ctypes.c_ulonglong),
("ss", ctypes.c_ulonglong),
("fs_base", ctypes.c_ulonglong),
("gs_base", ctypes.c_ulonglong),
("ds", ctypes.c_ulonglong),
("es", ctypes.c_ulonglong),
("fs", ctypes.c_ulonglong),
("gs", ctypes.c_ulonglong),
]
class user_regs_struct(ctypes.Structure):
"""
grep "struct user_regs_struct" /usr/include/sys/user.h
"""
_fields_ = [
("ebx", ctypes.c_uint32),
("ecx", ctypes.c_uint32),
("edx", ctypes.c_uint32),
("esi", ctypes.c_uint32),
("edi", ctypes.c_uint32),
("ebp", ctypes.c_uint32),
("eax", ctypes.c_uint32),
("xds", ctypes.c_uint32),
("xes", ctypes.c_uint32),
("xfs", ctypes.c_uint32),
("xgs", ctypes.c_uint32),
("orig_eax", ctypes.c_uint32),
("eip", ctypes.c_uint32),
("xcs", ctypes.c_uint32),
("eflags", ctypes.c_uint32),
("esp", ctypes.c_uint32),
("xss", ctypes.c_uint32),
]
#### CORE ####
class Chunk():
cflags = ["TCACHABLE", "FASTBIN", "SMALLBIN", "UNSORTEDBIN", "LARGEBIN", "TOPCHUNK" , \
"FLAGS", "PREV_INUSE", "IS_MMAPPED", "NOT_MAIN_ARENA", "SIZE", "PREVSIZE", "FD", "BK"]
def __repr__(self):
pass
# return "Chunk[{}](prev_size:0x{:x}, size:0x{:x}, fd:.... )".format(self.index, self.PREVSIZE, self.SIZE)
def __str__(self):
tmp_flags = ""
for i in ["NOT_MAIN_ARENA", "IS_MMAPPED", "PREV_INUSE"]:
if getattr(self, i) == 1:
tmp_flags += PTR["F2"](i)
else :
tmp_flags += PTR["F1"](i)
return " @{} Chunk[{}] -> [prev_size:{}| size:{}| fd:{}| bk:{}] {}".format( \
fmtL(self.index, self.data_size), \
PTR['H'](fmtR(self.addr, self.data_size)), \
fmtR(self.PREVSIZE, self.data_size), \
fmtR(self.SIZE, self.data_size), \
fmtR(self.FD, self.data_size), \
fmtR(self.BK, self.data_size), \
tmp_flags)
def __init__(self, idz, addr, xy86, data_size, chunk_range, data):
self.index = idz
self.addr = addr
self.xy86 = xy86
self.data_size = data_size
self.align = 2**47-1 if self.data_size == 8 else 2**31-1
self.chunk_range = chunk_range
self.chunk = { cc:struct.unpack(self.xy86, data[cc*self.data_size :(cc*self.data_size) + self.data_size])[0] for cc in range(self.chunk_range) }
checkZ( lambda : len(self.chunk.values()) != chunk_range, "Invalid size chunk")
self.chunk.update( {"prev_size":self.chunk[0] & self.align, "curr_size":(self.chunk[1] & ~7) & self.align, "curr_flag":self.chunk[1] & 7, \
"fd":self.chunk[2], "bk":self.chunk[3], "fdB":self.chunk[4], "bkB":self.chunk[5] })
self.__set_flags()
def __set_flags(self):
for i in Chunk.cflags :
globals()["self.{0}".format(i)] = False
self.SIZE=self.chunk['curr_size']
self.PREVSIZE=self.chunk['prev_size']
self.FD=self.chunk['fd']
self.BK=self.chunk['bk']
self.FLAGS=self.chunk['curr_flag']
self.PREV_INUSE = self.FLAGS & 1
self.IS_MMAPPED = self.FLAGS & 2
self.NOT_MAIN_ARENA = self.FLAGS & 4
class SGremlin:
def __init__(self, pid):
self.pid = pid
self.__init_runtime()
self.__init_ptrace()
self.__init_register()
self.ptrace_detach()
def __init_ptrace(self):
if self.xy86 == "Q":
self.libc.ptrace.restype = ctypes.c_uint64
self.libc.ptrace.argtypes = [ctypes.c_uint64, ctypes.c_uint64, ctypes.c_void_p, ctypes.c_void_p]
elif self.xy86 == "I":
self.libc.ptrace.restype = ctypes.c_uint32
self.libc.ptrace.argtypes = [ctypes.c_uint32, ctypes.c_uint32, ctypes.c_voidp, ctypes.c_voidp]
self.ptrace_attach()
def __init_register(self):
self.ptrace_getregs()
def __init_runtime(self):
"""
init_runtime(self)
Init the mapping address, architecture, element size in stack, libc CDLL struct, address heap, stack.
"""
self.map_file = '/proc/{}/maps'.format(self.pid)
self.exe_path = '/proc/{}/exe'.format(self.pid)
self.parse_maps_file()
self.setHeapAndArch()
self.setLibc()
checkZ( lambda : self.libc is None, "No libc found, wrong arch struct:{}.".format(self.xy86))
self.executable = os.path.realpath(self.exe_path)
self.libc.process_vm_readv.argtypes = [ctypes.c_uint64, ctypes.c_void_p, ctypes.c_uint64, ctypes.c_void_p, ctypes.c_uint64, ctypes.c_uint64]
self.regs_struct = user_regs_structX64 if self.data_size == 8 else user_regs_struct
def parse_maps_file(self):
"""
parse_maps_file(self)
Init dict mapping address
"""
self.maps = dict()
map_file = open(self.map_file, "r")
for line in map_file:
line = line.strip()
parts = line.split()
(addr_start, addr_end) = map(lambda x: int(x, 16), parts[0].split('-'))
permissions = parts[1]
offset = int(parts[2], 16)
device_id = parts[3]
inode = parts[4]
map_name = parts[5] if len(parts) > 5 else ''
mapping = {
'addr_start': addr_start,
'addr_end': addr_end,
'size': addr_end - addr_start,
'permissions': permissions,
'offset': offset,
'device_id': device_id,
'inode': inode,
'map_name': map_name
}
self.maps[map_name].append(mapping) if map_name in self.maps else self.maps.update({map_name:[mapping]})
map_file.close()
def setHeapAndArch(self):
"""
getHeapAndArch(self)
Set, in order, address heap, stack, architecture, element size in stack
"""
checkZ( lambda : not ('[heap]' in self.maps ) or not ('[stack]' in self.maps ), "No heap/stack found.")
self.heap = self.maps['[heap]'][0]
self.stack = self.maps['[stack]'][0]
tmp1 = ("Q",8) if hex(self.stack['addr_start']).startswith("0x7f") else ("I",4)
self.xy86 = tmp1[0]
self.data_size = tmp1[1]
def setLibc(self):
"""
get_libc(self)
Set self.libc ctypes.CDLL
"""
if self.xy86 == "Q":
self.libc = ctypes.CDLL('/lib/x86_64-linux-gnu/libc.so.6')
elif self.xy86 == "I":
self.libc = ctypes.CDLL('/lib/i386-linux-gnu/libc.so.6')
else :
self.libc = None
def read_process_memory(self, address, size):
"""
read_process_memory(self, int, int) -> list(byte)
Return a dump of memory from address till address+size
"""
func=self.libc.process_vm_readv
bytes_buffer = ctypes.create_string_buffer(size)
local_iovec = iovec(ctypes.cast(ctypes.byref(bytes_buffer), ctypes.c_void_p), size)
remote_iovec = iovec(ctypes.c_void_p(address), size)
bytes_transferred = func(self.pid, ctypes.byref(local_iovec), 1, ctypes.byref(remote_iovec), 1, 0)
return bytes_buffer
def write_process_memory(self, address, size, data):
"""
write_process_memory(self, int, int, list(byte)) -> int
Overwrite some memory space from address till address+size
"""
checkZ( lambda : size >= len(data), "Data too large.")
bytes_buffer = ctypes.create_string_buffer(size)
bytes_buffer.raw = data
local_iovec = iovec(ctypes.cast(ctypes.byref(bytes_buffer), ctypes.c_void_p), size)
remote_iovec = iovec(ctypes.c_void_p(address), size)
bytes_transferred = self.libc.process_vm_writev( self.pid, ctypes.byref(local_iovec), 1, ctypes.byref(remote_iovec), 1, 0)
return bytes_transferred
def init_main_arena(self):
self.heap_range = (self.heap['addr_start'], self.heap['addr_end'])
self.heap_data = self.read_process_memory(self.heap['addr_start'], self.heap['size'])
self.chunk_range = 6
self.chunk_list = []
i = 0
idZ = 0
limit = len(self.heap_data) - (self.data_size * self.chunk_range)
while i < limit :
tmp_chunk = Chunk(idZ, i+self.heap['addr_start'], self.xy86, self.data_size, self.chunk_range, self.heap_data[i : (i + self.chunk_range * self.data_size) ] )
self.chunk_list.append(tmp_chunk)
i += tmp_chunk.SIZE
idZ += 1
#
self.__arena_struct()
self.__tcache_struct()
self.__check()
return
def print_arena(self):
print(PTR["H1"]("\n### HEAP:"))
for chunk in self.chunk_list :
print(chunk)
def __arena_struct(self):
#TODO: parse main arena, and identify binlists
pass
def __tcache_struct(self):
#TODO: parse tcache_perthread_struct
pass
def __check(self):
#TODO: check all is correct, e.g. find invalid chunk, prev_size corrupted, etc.
pass
def ptrace_attach(self):
return self.libc.ptrace(PTRACE_ATTACH, self.pid, None, None)
def ptrace_detach(self):
return self.libc.ptrace(PTRACE_DETACH, self.pid, None, None)
def ptrace_getregs(self):
self.regs_status = self.regs_struct()
self.libc.ptrace(PTRACE_GETREGS, self.pid, None, ctypes.byref(self.regs_status))
def ptrace_setregs(self):
self.libc.ptrace(PTRACE_SETREGS, self.pid, None, ctypes.byref(self.regs_status))
def ptrace_singlestep(self):
libc.ptrace(PTRACE_SINGLESTEP, self.pid, 0, 0)
def print_regs(self):
print(PTR["H2"]("\n### Register:"))
for field_name, field_type in self.regs_status._fields_:
print(" {} {}".format( field_name.ljust(10," "), fmtR(getattr(self.regs_status, field_name), self.data_size+1)))
if __name__ == "__main__" :
try:
pid = int(sys.argv[1])
# parse maps
sgremlin = SGremlin(pid)
# parse main arena
sgremlin.init_main_arena()
# print main arena
sgremlin.print_arena()
# print regs, detach the process first, otherwise cannot
sgremlin.print_regs()
except Exception:
print(DESCR)
checkZ(lambda : True, "#Usage: python3 {} <pid>\n".format(sys.argv[0]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment