Skip to content

Instantly share code, notes, and snippets.

@n4sm
Last active March 20, 2023 08:54
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save n4sm/8849382bdeab8bae1ef0557c61aa2fcd to your computer and use it in GitHub Desktop.
Save n4sm/8849382bdeab8bae1ef0557c61aa2fcd to your computer and use it in GitHub Desktop.
Juste a simple auto unpacker for elf binaries which is using qiling: https://kernemporium.github.io/articles/en/auto_unpacking/m.html
#!/usr/bin/python3
# /*
# * ----------------------------------------------------------------------------
# * "THE BEER-WARE LICENSE" (Revision 42):
# * n4sm wrote this file. As long as you retain this notice you
# * can do whatever you want with this stuff. If we meet some day, and you think
# * this stuff is worth it, you can buy me a beer in return Poul-Henning Kamp
# * ----------------------------------------------------------------------------
# * /
from unicorn import (
UC_PROT_ALL,
UC_PROT_EXEC,
UC_PROT_NONE,
UC_PROT_READ,
UC_PROT_WRITE
)
import sys
import os
import random
from operator import itemgetter
from capstone import *
from capstone.x86 import *
from keystone import *
sys.path.append("../")
# To edit
from qiling import *
from qiling.const import *
from qiling.os.linux.thread import *
from qiling.const import *
from qiling.os.posix.filestruct import *
from qiling.os.filestruct import *
from qiling.os.posix.const_mapping import *
from qiling.exception import *
mem_regions_exec = [] # list of list
mem_regions = []
trace = [None]
c_m = []
cache = []
idx = 0x0
pinst = 0
md = Cs(CS_ARCH_X86, CS_MODE_64)
def is_available(ql, addr, size):
'''
The main function of is_available is to determine
whether the memory starting with addr and having a size of length can be used for allocation.
If it can be allocated, returns True.
If it cannot be allocated, it returns False.
'''
try:
ql.mem.map(addr, size)
except:
return False
ql.mem.unmap(addr, size)
return True
def local_hook(ql, addr, size, prot) -> int:
tmp_lst = [addr, size, prot]
s = True
if prot & UC_PROT_EXEC:
print("[+] mapping with PROT_EXEC prot 0x%x - 0x%x" % (addr, addr + size))
for i in range(len(mem_regions_exec)):
if tmp_lst[0] == mem_regions_exec[i][0]:
mem_regions_exec[i] = tmp_lst
s = False
break
if s:
mem_regions_exec.append(tmp_lst)
for i in range(len(mem_regions)):
if tmp_lst[0] == mem_regions[i][0]:
mem_regions[i] = tmp_lst
return 0x0
mem_regions.append(tmp_lst)
return 0x0
def mmap_c(ql, mmap2_addr, mmap2_length, mmap2_prot, mmap2_flags, mmap2_fd, mmap2_pgoffset):
mmap2_length_align = ((mmap2_length + 0x1000 - 1) // 0x1000) * 0x1000
mmap2_addr_align = ((mmap2_addr + 0x1000 - 1) // 0x1000) * 0x1000
ret = False
# this is ugly patch, we might need to get value from elf parse,
# is32bit or is64bit value not by arch
MAP_ANONYMOUS=32
if (ql.archtype== QL_ARCH.ARM64) or (ql.archtype== QL_ARCH.X8664):
mmap2_fd = ql.unpack64(ql.pack64(mmap2_fd))
elif (ql.archtype== QL_ARCH.MIPS):
mmap2_fd = ql.unpack32s(ql.mem.read(mmap2_fd, 4))
mmap2_pgoffset = ql.unpack32(ql.mem.read(mmap2_pgoffset, 4)) * 4096
MAP_ANONYMOUS=2048
else:
mmap2_fd = ql.unpack32s(ql.pack32(mmap2_fd))
mmap2_pgoffset = mmap2_pgoffset * 4096
if mmap2_addr != 0 and ql.mem.is_mapped(mmap2_addr, mmap2_length):
ql.dprint(D_INFO, "[+] log mmap2 - mmap2(0x%x, 0x%x, 0x%x, 0x%x, %x, %d)" % (
mmap2_addr, mmap2_length, mmap2_prot, mmap2_flags, mmap2_fd, mmap2_pgoffset))
ql.dprint(D_INFO, "[+] log mmap2 - mmap2(0x%x, 0x%x, %s, %s, %x, %d)" % (
mmap2_addr, mmap2_length, mmap_prot_mapping(mmap2_prot), mmap_flag_mapping(mmap2_flags), mmap2_fd, mmap2_pgoffset))
ql.dprint(D_INFO, "[+] log mmap2 - return addr : " + hex(0x0))
ql.uc.mem_protect(mmap2_addr_align, mmap2_length_align, mmap2_prot)
local_hook(ql, mmap2_addr, mmap2_length_align, mmap2_prot)
ql.mem.add_mapinfo(mmap2_addr, mmap2_addr + mmap2_length_align, mem_p = mmap2_prot, mem_info = "[mapped]")
return 0x0
if mmap2_addr == 0:
while ret == False:
random_base = random.randint(0x0, 0x00007fffffffffff)
random_base_align = ((random_base + 0x1000 - 1) // 0x1000) * 0x1000
ret = is_available(ql, random_base_align, mmap2_length_align)
else:
random_base_align = mmap2_addr_align
ql.dprint(D_INFO, "[+] log mmap2 - mmap2(0x%x, 0x%x, 0x%x, 0x%x, %x, %d)" % (
mmap2_addr, mmap2_length, mmap2_prot, mmap2_flags, mmap2_fd, mmap2_pgoffset))
ql.dprint(D_INFO, "[+] log mmap2 - mmap2(0x%x, 0x%x, %s, %s, %x, %d)" % (
mmap2_addr, mmap2_length, mmap_prot_mapping(mmap2_prot), mmap_flag_mapping(mmap2_flags), mmap2_fd, mmap2_pgoffset))
ql.dprint(D_INFO, "[+] log mmap2 - return addr : " + hex(random_base_align))
ql.dprint(D_INFO, "[+] log mmap2 - addr range : " + hex(random_base_align) + ' - ' + hex(
random_base_align + mmap2_length_align))
try:
ql.mem.map(random_base_align, mmap2_length_align)
except:
ql.mem.show_mapinfo()
raise
ql.uc.mem_protect(random_base_align, mmap2_length_align, UC_PROT_ALL)
# To avoid an invalid memory write
ql.mem.write(random_base_align, b'\x00' * mmap2_length_align)
if ((mmap2_flags & MAP_ANONYMOUS) == 0) and mmap2_fd < 256 and ql.os.file_des[mmap2_fd] != 0:
ql.os.file_des[mmap2_fd].lseek(mmap2_pgoffset)
data = ql.os.file_des[mmap2_fd].read(mmap2_length)
mem_info = str(ql.os.file_des[mmap2_fd].name)
ql.dprint(D_INFO, "[+] log mem write : " + hex(len(data)))
ql.dprint(D_INFO, "[+] log mem mmap2 : " + mem_info)
ql.mem.add_mapinfo(random_base_align, random_base_align + (len(data)), mem_p = UC_PROT_ALL, mem_info = mem_info)
ql.mem.write(random_base_align, data)
ql.nprint("mmap2(0x%x, 0x%x, 0x%x, 0x%x, %d, %d) = 0x%x" % (mmap2_addr, mmap2_length, mmap2_prot, mmap2_flags, mmap2_fd, mmap2_pgoffset, random_base_align))
regreturn = random_base_align
ql.dprint(D_INFO, "[+] mmap2_block is at 0x%x with a length of 0x%x" % (regreturn, mmap2_length_align))
ql.uc.mem_protect(random_base_align, mmap2_length_align, mmap2_prot)
ql.mem.add_mapinfo(random_base_align, random_base_align + mmap2_length_align, mem_p = mmap2_prot, mem_info="[mapped]")
ql.mem.show_mapinfo()
local_hook(ql, random_base_align, mmap2_length_align, mmap2_prot)
ql.os.definesyscall_return(regreturn)
def ql_syscall_mprotect(ql, mprotect_start, mprotect_len, mprotect_prot, *args, **kw):
regreturn = 0
ql.nprint("mprotect(0x%x, 0x%x, 0x%x) = %d" % (mprotect_start, mprotect_len, mprotect_prot, regreturn))
ql.dprint(D_INFO, "[+] mprotect(0x%x, 0x%x, %s) = %d" % (
mprotect_start, mprotect_len, mmap_prot_mapping(mprotect_prot), regreturn))
base_align = ((mprotect_start + 0x1000 - 1) // 0x1000) * 0x1000
length_align = ((mprotect_len + 0x1000 - 1) // 0x1000) * 0x1000
ql.uc.mem_protect(base_align, length_align, mprotect_prot)
ql.mem.add_mapinfo(base_align, base_align+length_align, mem_p=mprotect_prot, mem_info="[mprotect]")
local_hook(ql, base_align, length_align, mprotect_prot)
ql.os.definesyscall_return(regreturn)
def setup_self_proc(exe_name : str, exe_path : str, rootfs : str) -> None:
with open(rootfs + "/proc/self/exe", "wb") as file_created:
file_created.seek(0x0)
with open(exe_path, "rb") as exe_file:
data_exe = exe_file.read()
file_created.write(data_exe)
# def check_jmp(ql, addr, size):
# ins = ql.mem.read(addr, size)
# reg_val = 0x0
# md = Cs(CS_ARCH_X86, CS_MODE_64)
# md.detail = True
# for i in md.disasm(ins, addr):
# for g in i.groups:
# if g & CS_GRP_JUMP != 0 or g & CS_GRP_CALL != 0:
# for op in i.operands:
# if op.type == X86_OP_REG:
# t = op.type
# reg_val = ql.uc.reg_read(op.value.reg)
# for area in mem_regions_exec:
# if reg_val >= area[0] and reg_val <= area[0] + area[1]:
# print("[*] Indirect jmp in a mmaped area : %s %s; %s=0x%x" % (i.mnemonic, i.op_str, i.reg_name(t), reg_val))
def is_in_range(addr) -> bool:
"""
Checks if an address is in the range of executable memeory area mapped by the process
"""
addr = ((addr >> 12) << 12)
for area in mem_regions_exec:
if addr >= area[0] and addr <= area[0] + area[1]:
return True
return False
def dump(ql, addr):
matchs = ql.mem.search(b"\x7fELF")
average = {}
for ar in matchs:
if addr > ar:
average[ar] = addr - ar
m_sorted = sorted(average, key=lambda x: average.keys())
addr_dmp = m_sorted[0]
shoff = int.from_bytes(ql.mem.read(addr_dmp + 40, 0x8), "little")
print("[+] Section header offset : 0x%x" % shoff)
len_section_headers = int.from_bytes(ql.mem.read(addr_dmp + 58, 0x2), "little") * int.from_bytes(ql.mem.read(addr_dmp + 60, 0x2), "little")
print("[+] length of the section headers : 0x%x" % len_section_headers)
len_dmp = shoff + len_section_headers
print("[+] Final binary dumped : 0x%x - 0x%x" % (addr_dmp, addr_dmp + len_dmp))
dmp = ql.mem.read(addr_dmp, len_dmp)
with open("bin.u", "wb+") as file_unpacked:
file_unpacked.write(dmp)
def _callback_(ql, addr, size):
global md, idx, pinst
# (addr >> 12) << 12) is the base address of the page to which belongs addr
if ((addr >> 12) << 12) in c_m or addr in cache:
# If the address is already in the cache miss or in the cache it returns
return 0
elif not is_in_range(((addr >> 12) << 12)):
# If it is not yet in the cache miss and that addr is not in a mapped area in PF_X
print("{} append to the cache".format(hex(((addr >> 12) << 12))))
c_m.append(((addr >> 12) << 12))
return 0
if len(cache) < 1024:
# cache for obfuscated instrcutions (to avoid loop repetitions)
cache.append(addr)
else:
cache[idx % 1024] = addr
idx += 1
pinst += 1
# Number of unpacked instructions executed
if pinst == 0x1:
dump(ql, addr)
ins = ql.mem.read(addr, size)
# We read the raw bytes for the instruction
for i in md.disasm(ins, addr):
# We disassemble it
insnt = "{} : {} {}".format(hex(i.address), i.mnemonic, i.op_str)
print(insnt)
if insnt not in trace:
# append meta datas for the instruction in the trace
trace.append(insnt)
return 0
if __name__ == "__main__":
_ql_ = Qiling(["rootfs/x8664_linux/bin/leetness"], "rootfs/x8664_linux/", output="debug")
# parameters to edit
setup_self_proc("leetness", "/media/nasm/7044d811-e1cd-4997-97d5-c08072ce9497/Downloads/qiling/examples/rootfs/x8664_linux/bin/leetness", "rootfs/x8664_linux")
# same
_ql_.set_syscall(0x9, mmap_c)
_ql_.set_syscall(0xa, ql_syscall_mprotect)
_ql_.hook_code(_callback_)
_ql_.run()
@H4niz
Copy link

H4niz commented Jan 8, 2021

Dear sir,

I got this error however I can not find any solutions on google.

File "sg_engine.py", line 173, in mmap_c if ((mmap2_flags & MAP_ANONYMOUS) == 0) and mmap2_fd < 256 and ql.os.file_des[mmap2_fd] != 0: AttributeError: 'QlOsLinux' object has no attribute 'file_des'

I dont know where to find attribute 'file_des'! Do you have any ideas?

My context:

Python 3.6
unicodecsv==0.14.1
unicorn==1.0.2
qiling==1.2

@n4sm
Copy link
Author

n4sm commented Jan 8, 2021

It's the qiling version which is different from mine. It seems that they have deleted the file_des attribute from QlOsLinux I'll check it out.

@H4niz
Copy link

H4niz commented Jan 9, 2021

Thanks, I am looking forward your solutions!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment