Skip to content

Instantly share code, notes, and snippets.

@aquynh
Forked from peternguyen93/shellcode_maker.py
Created September 19, 2016 08:12
Show Gist options
  • Save aquynh/b71c2051f4352c417e54b5ca96a7bc22 to your computer and use it in GitHub Desktop.
Save aquynh/b71c2051f4352c417e54b5ca96a7bc22 to your computer and use it in GitHub Desktop.
from keystone import *
from capstone import *
from unicorn import *
from unicorn.x86_const import *
from struct import *
from termcolor import *
import os
import sys
import socket
OPEN_FLAGS = { # define symbol
0:os.O_RDONLY,
1:os.O_WRONLY,
2:os.O_RDWR,
}
# default fd
FD = [ sys.stdin.fileno() ,sys.stdout.fileno(),sys.stderr.fileno() ]
class Asm_Compiler:
def __init__(self,arch,mode):
if arch == 'x86':
self.arch = KS_ARCH_X86
if mode == 'x32':
self.mode = KS_MODE_32
elif mode == 'x64':
self.mode = KS_MODE_64
else:
raise Exception('Unsupported your CPU mode %s' % mode)
else:
raise Exception('Unsupported your architecture %s' % arch)
def compile(self,asm_code):
encoding = ''
try:
ks = Ks(self.arch,self.mode)
encoding,count = ks.asm(asm_code)
except KsError as e:
print("ERROR: %s" % e)
return ''.join([chr(c) for c in encoding])
def m_str_read(uc,addr):
content = ''
i = 0
while '\x00' not in content:
content += str(uc.mem_read(addr + i,1))
i += 1
return content.strip('\x00')
def str_array(uc,addr):
array_args = []
addrs = []
i = 0
try:
while 1:
addr = unpack('<I',uc.mem_read(addr + i,ADDR_ALIGN))[0]
if addr != 0:
addrs.append(addr)
else:
break
i += ADDR_ALIGN
for addr in addrs:
array_args.append(m_str_read(uc,addr))
except UcError:
pass
return array_args
# handle x32 hook interupt
def x32_hook_int(uc, intno, user_data):
if intno != 0x80:
return
eax = uc.reg_read(UC_X86_REG_EAX)
ebx = uc.reg_read(UC_X86_REG_EBX)
ecx = uc.reg_read(UC_X86_REG_ECX)
edx = uc.reg_read(UC_X86_REG_EDX)
eip = uc.reg_read(UC_X86_REG_EIP)
if eax == 1: # sys_exit
print("call SYS_EXIT")
uc.emu_stop()
elif eax == 3: # sys_read
fd = ebx
buf = ecx
size = edx
print("read(%d,%s,%d)" % (fd,hex(buf),size))
content = ''
if fd in FD:
try:
content = os.read(fd,size)
uc.mem_write(buf, content)
# write length back to eax
uc.reg_write(UC_X86_REG_EAX,size)
except IOError:
uc.reg_write(UC_X86_REG_EAX,-2) # error
else:
uc.reg_write(UC_X86_REG_EAX,-1) # error
elif eax == 4: # sys_write
fd = ebx
buf = ecx
size = edx
print("write(%d,%s,%d)" % (fd,hex(buf),size))
content = ''
if fd in FD:
try:
content = str(uc.mem_read(buf, size))
os.write(fd,content)
# write length back to eax
uc.reg_write(UC_X86_REG_EAX,size)
except IOError:
uc.reg_write(UC_X86_REG_EAX,-2) # error
else:
uc.reg_write(UC_X86_REG_EAX,-1) # error
elif eax == 5: # sys_open
filename_addr = ebx
flags = ecx
mode = edx
filename = m_str_read(uc,filename_addr)
print("open(\"%s\",%d,%d)" % (filename,flags,mode))
if OPEN_FLAGS.has_key(flags):
try:
new_fd = os.open(filename,OPEN_FLAGS[flags])
FD.append(new_fd)
uc.reg_write(UC_X86_REG_EAX,new_fd)
except OSError:
uc.reg_write(UC_X86_REG_EAX,-2) # error
else:
uc.reg_write(UC_X86_REG_EAX,-1) # error
elif eax == 6: # sys_close
fd = ebx
if fd in FD:
try:
os.close(fd)
FD.remove(fd) # remove this fd
except OSError:
uc.reg_write(UC_X86_REG_EAX,-2) # error
else:
uc.reg_write(UC_X86_REG_EAX,-1) # error
elif eax == 11: # sys_execv
path = ebx
_argv = ecx
_env = edx
s_path = m_str_read(uc,path)
s_argv = str(str_array(uc,_argv))
s_env = str(str_array(uc,_env))
print(colored("[!] sys_execve(\"%s\",%s,%s) detected !!!" % (s_path,s_argv,s_env),"red"))
uc.reg_write(UC_X86_REG_EAX,-1)
elif eax == 63: # sys_dup2
fd1 = ebx
fd2 = ecx
if fd1 in FD:
try:
os.dups(fd1,fd2)
FD.append(fd2)
uc.reg_write(UC_X86_REG_EAX,fd2)
except OSError:
uc.reg_write(UC_X86_REG_EAX,-2) # error
else:
uc.reg_write(UC_X86_REG_EAX,-1) # error
else:
print(colored("Your sys_call %d is not supported" % eax,"red"))
def x64_hook_int(uc, intno, user_data):
pass
class Debugger():
def __init__(self,arch,mode):
if arch == 'x86':
self.arch = UC_ARCH_X86
if mode == 'x32':
self.mode = UC_MODE_32
elif mode == 'x64':
self.mode = UC_MODE_64
else:
raise Exception('Unsupported your CPU mode %s' % mode)
else:
raise Exception('Unsupported your architecture %s' % arch)
def run(self,code):
print(colored(">>> Emulate i386 code","cyan"))
ADDRESS = 0x1337000
try:
# Initialize emulator in X86-32bit mode
mu = Uc(self.arch, self.mode)
# map 2MB memory for this emulation
mu.mem_map(ADDRESS, 2 * 1024 * 1024)
# write machine code to be emulated to memory
mu.mem_write(ADDRESS, code)
if self.mode == UC_MODE_32:
# initialize stack
mu.reg_write(UC_X86_REG_ESP, ADDRESS + 0x100000)
# handle interrupt ourself
mu.hook_add(UC_HOOK_INTR, x32_hook_int)
elif self.mode == UC_MODE_64:
# initialize stack
mu.reg_write(UC_X86_REG_RSP, ADDRESS + 0x100000)
# handle interrupt ourself
mu.hook_add(UC_HOOK_INTR, x64_hook_int)
else:
raise UcError('Unsupported Architecture')
# emulate machine code in infinite time
mu.emu_start(ADDRESS, ADDRESS + len(code))
# now print out some registers
except UcError as e:
# pass
print("ERROR: %s" % e)
print(colored("\n>>> Emulation done","cyan"))
compiler = Asm_Compiler('x86','x32')
dbg = Debugger('x86','x32')
file_asm = sys.argv[1]
f = open(file_asm,'r')
content = f.read()
f.close()
code = compiler.compile(content)
dbg.run(code + './passwd\x00')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment