Skip to content

Instantly share code, notes, and snippets.

@herrcore
Last active April 15, 2024 02:06
Show Gist options
  • Save herrcore/0649d85a6838972db5da71bed6ed676b to your computer and use it in GitHub Desktop.
Save herrcore/0649d85a6838972db5da71bed6ed676b to your computer and use it in GitHub Desktop.
Lumma Stealer Deobfuscation (IDA Python)
# import idautils
import idc
import ida_bytes
import ida_ua
import ida_funcs
import ida_idp
from idautils import DecodeInstruction
import struct
jump_instructions = [
'jmp', 'ja', 'jae', 'jb', 'jbe', 'jc', 'jcxz', 'jecxz', 'jrcxz', 'je',
'jg', 'jge', 'jl', 'jle', 'jna', 'jnae', 'jnb', 'jnbe', 'jnc', 'jne',
'jng', 'jnge', 'jnl', 'jnle', 'jno', 'jnp', 'jns', 'jnz', 'jo', 'jp',
'jpe', 'jpo', 'js', 'jz'
]
def scan_binary(start_addr, end_addr, file_data):
mem_mov_addr = None
mem_mov_reg = None
mem_mov_indexed_addr = None
mem_mov_indexed_reg = None
mem_mov_indexed_reg_index = None
curr_addr = start_addr
while curr_addr <= end_addr:
# Create an insn_t object at the current address
insn = ida_ua.insn_t()
ida_ua.decode_insn(insn, curr_addr)
# Get the disassembly of the current instruction
disasm = ida_ua.print_insn_mnem(insn.ea)
if disasm is None:
print(f"{hex(curr_addr)}: Skipping non code")
else:
#insn = DecodeInstruction(curr_addr)
#print(insn)
print(f"{hex(curr_addr)}: {disasm}")
if 'jmp' in disasm:
if insn.Op1.type == ida_ua.o_reg:
reg = ida_idp.get_reg_name(insn.Op1.reg, 4)
print(f"\tJump to reg {reg}")
# Get the closest mem move into the jump register
if mem_mov_reg == reg and mem_mov_indexed_reg == reg:
if mem_mov_addr < mem_mov_indexed_addr:
print(f"\tClosest mem move {hex(mem_mov_indexed_addr)}")
new_addr_0 = None
new_addr_1 = None
unicorn_reg = get_unicorn_reg_name(reg)
index_reg = get_unicorn_reg_name(mem_mov_indexed_reg_index)
new_addr_0 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 0)
new_addr_1 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 1)
if new_addr_0 is None:
print(f"\tERROR emulation failed for new_addr_0")
elif new_addr_1 is None:
print(f"\tERROR emulation failed for new_addr_1")
else:
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 0]to {hex(new_addr_0)}")
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 1]to {hex(new_addr_1)}")
ida_bytes.set_cmt(curr_addr, f"0={hex(new_addr_0)}\n1={hex(new_addr_1)}", 0)
patch_indexed(mem_mov_indexed_addr, curr_addr, reg, mem_mov_indexed_reg_index, new_addr_1, new_addr_0)
else:
print(f"\tClosest mem move {hex(mem_mov_addr)}")
new_addr = None
unicorn_reg = get_unicorn_reg_name(reg)
new_addr = emulate_jmp_table(file_data, mem_mov_addr, curr_addr, unicorn_reg)
if new_addr is None:
print(f"\tERROR emulation failed")
else:
print(f"\tEmulated jump to {hex(new_addr)}")
ida_bytes.set_cmt(curr_addr, hex(new_addr), 0)
patch_direct(mem_mov_addr,curr_addr,new_addr)
elif mem_mov_reg == reg:
print(f"\tClosest mem move {hex(mem_mov_addr)}")
new_addr = None
unicorn_reg = get_unicorn_reg_name(reg)
new_addr = emulate_jmp_table(file_data, mem_mov_addr, curr_addr, unicorn_reg)
if new_addr is None:
print(f"\tERROR emulation failed")
else:
print(f"\tEmulated jump to {hex(new_addr)}")
ida_bytes.set_cmt(curr_addr, hex(new_addr), 0)
patch_direct(mem_mov_addr,curr_addr,new_addr)
elif mem_mov_indexed_reg == reg:
print(f"\tClosest mem move {hex(mem_mov_indexed_addr)}")
new_addr_0 = None
new_addr_1 = None
unicorn_reg = get_unicorn_reg_name(reg)
index_reg = get_unicorn_reg_name(mem_mov_indexed_reg_index)
new_addr_0 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 0)
new_addr_1 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 1)
if new_addr_0 is None:
print(f"\tERROR emulation failed for new_addr_0")
elif new_addr_1 is None:
print(f"\tERROR emulation failed for new_addr_1")
else:
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 0]to {hex(new_addr_0)}")
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 1]to {hex(new_addr_1)}")
ida_bytes.set_cmt(curr_addr, f"0={hex(new_addr_0)}\n1={hex(new_addr_1)}", 0)
patch_indexed(mem_mov_indexed_addr, curr_addr, reg, mem_mov_indexed_reg_index, new_addr_1, new_addr_0)
else:
if mem_mov_indexed_addr is not None and mem_mov_addr is not None:
if mem_mov_addr > mem_mov_indexed_addr:
print(f"\tFurther mem move {hex(mem_mov_indexed_addr)}")
new_addr_0 = None
new_addr_1 = None
unicorn_reg = get_unicorn_reg_name(reg)
index_reg = get_unicorn_reg_name(mem_mov_indexed_reg_index)
new_addr_0 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 0)
new_addr_1 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 1)
if new_addr_0 is None:
print(f"\tERROR emulation failed for new_addr_0")
elif new_addr_1 is None:
print(f"\tERROR emulation failed for new_addr_1")
else:
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 0]to {hex(new_addr_0)}")
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 1]to {hex(new_addr_1)}")
ida_bytes.set_cmt(curr_addr, f"0={hex(new_addr_0)}\n1={hex(new_addr_1)}", 0)
patch_indexed(mem_mov_indexed_addr, curr_addr, reg, mem_mov_indexed_reg_index, new_addr_1, new_addr_0)
else:
print(f"\tFurther mem move {hex(mem_mov_addr)}")
new_addr = None
unicorn_reg = get_unicorn_reg_name(reg)
new_addr = emulate_jmp_table(file_data, mem_mov_addr, curr_addr, unicorn_reg)
if new_addr is None:
print(f"\tERROR emulation failed")
else:
print(f"\tEmulated jump to {hex(new_addr)}")
ida_bytes.set_cmt(curr_addr, hex(new_addr), 0)
patch_direct(mem_mov_addr,curr_addr,new_addr)
elif mem_mov_addr is not None:
print(f"\tFurther mem move {hex(mem_mov_addr)}")
new_addr = None
unicorn_reg = get_unicorn_reg_name(reg)
new_addr = emulate_jmp_table(file_data, mem_mov_addr, curr_addr, unicorn_reg)
if new_addr is None:
print(f"\tERROR emulation failed")
else:
print(f"\tEmulated jump to {hex(new_addr)}")
ida_bytes.set_cmt(curr_addr, hex(new_addr), 0)
patch_direct(mem_mov_addr,curr_addr,new_addr)
elif mem_mov_indexed_addr is not None:
print(f"\tFurther mem move {hex(mem_mov_indexed_addr)}")
new_addr_0 = None
new_addr_1 = None
unicorn_reg = get_unicorn_reg_name(reg)
index_reg = get_unicorn_reg_name(mem_mov_indexed_reg_index)
new_addr_0 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 0)
new_addr_1 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 1)
if new_addr_0 is None:
print(f"\tERROR emulation failed for new_addr_0")
elif new_addr_1 is None:
print(f"\tERROR emulation failed for new_addr_1")
else:
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 0]to {hex(new_addr_0)}")
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 1]to {hex(new_addr_1)}")
ida_bytes.set_cmt(curr_addr, f"0={hex(new_addr_0)}\n1={hex(new_addr_1)}", 0)
patch_indexed(mem_mov_indexed_addr, curr_addr, reg, mem_mov_indexed_reg_index, new_addr_1, new_addr_0)
# Re disassemble the patched bytes
else:
print(f"\tNo mem move found")
# Reset the memory tracking for any jmp
mem_mov_addr = None
mem_mov_reg = None
mem_mov_indexed_addr = None
mem_mov_indexed_reg = None
mem_mov_indexed_reg_index = None
elif 'mov' in disasm and insn.Op2.type == ida_ua.o_mem:
reg = ida_idp.get_reg_name(insn.Op1.reg, 4)
jump_table_addr = insn.Op2.addr
if insn.Op2.specflag1 == 0:
# Direct move
print(f"\tMove from memory directly {hex(jump_table_addr)}")
mem_mov_addr = curr_addr
mem_mov_reg = reg
else:
# Indexed move
index_reg = (insn.Op2.specflag2 >> 3) & 0x7
index_reg_name = ida_idp.get_reg_name(index_reg, 4)
print(f"\tMove from indexed memory {hex(jump_table_addr)} index register {index_reg_name}")
mem_mov_indexed_addr = curr_addr
mem_mov_indexed_reg = reg
mem_mov_indexed_reg_index = index_reg_name
elif 'call' in disasm or disasm in jump_instructions:
# Reset the memory tracking for any control flow
mem_mov_addr = None
mem_mov_reg = None
mem_mov_indexed_addr = None
mem_mov_indexed_reg = None
mem_mov_indexed_reg_index = None
# Get next address
curr_addr = ida_bytes.next_head(curr_addr, end_addr)
import unicorn
from unicorn import *
from unicorn.x86_const import *
import struct
import pefile
def get_unicorn_reg_name(reg):
if reg == "eax":
return UC_X86_REG_EAX
elif reg == "ebx":
return UC_X86_REG_EBX
elif reg == "ecx":
return UC_X86_REG_ECX
elif reg == "edx":
return UC_X86_REG_EDX
elif reg == "esi":
return UC_X86_REG_ESI
elif reg == "edi":
return UC_X86_REG_EDI
else:
return None
def map_pe_sections(file_data):
# Parse the PE file
pe = pefile.PE(data=file_data)
# Initialize the Unicorn engine
mu = Uc(UC_ARCH_X86, UC_MODE_32)
# PE base
pe_base = pe.OPTIONAL_HEADER.ImageBase
# Map each section into the Unicorn engine
for section in pe.sections:
base_address = section.VirtualAddress + pe_base
section_size = max(section.SizeOfRawData, section.Misc_VirtualSize)
# Map the section into the Unicorn engine's memory
mu.mem_map(base_address, section_size)
# Write the section's data into the Unicorn engine's memory
mu.mem_write(base_address, section.get_data())
# Add a stack for the Unicorn engine
stack_base = 0x1000000
stack_size = 1024 * 1024
mu.mem_map(stack_base, stack_size)
mu.reg_write(UC_X86_REG_ESP, (stack_base + stack_size) // 2)
mu.reg_write(UC_X86_REG_EBP, (stack_base + stack_size) // 2)
return mu
def hook_unmapped_mem(uc, access, address, size, value, user_data):
print("HOOK: Fix unmapped memory at address: 0x{0:X}".format(address))
# Map the memory page on-the-fly
page_start = address & ~(0x1000 - 1) # Round down to nearest multiple of 0x1000
uc.mem_map(page_start, 0x1000) # Map 1 page of memory
return True # Continue execution
def emulate_jmp_table(file_data, start, end, jmp_reg):
print(f"Emulation start:{hex(start)} end:{hex(end)} jmp_reg{jmp_reg}")
# try:
mu = map_pe_sections(file_data)
mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED, hook_unmapped_mem)
mu.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED, hook_unmapped_mem)
# Start emulation
mu.emu_start(start, end)
# read the value of eax register
reg_value = mu.reg_read(jmp_reg)
# except Exception as e:
# print(e)
# reg_value = None
return reg_value
def emulate_jmp_table_condition(file_data, start, end, jmp_reg, condition_reg, condition_value):
mu = map_pe_sections(file_data)
# Set the condition register to the desired value
mu.reg_write(condition_reg, condition_value)
mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED, hook_unmapped_mem)
mu.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED, hook_unmapped_mem)
# Start emulation
mu.emu_start(start, end)
# read the value of eax register
reg_value = mu.reg_read(jmp_reg)
return reg_value
def get_preserve_bytes(start, end):
# Starting at end go backwards until we find an inc instruction or the start
curr_addr = end
preserve_bytes = b""
while curr_addr >= start:
insn = ida_ua.insn_t()
ida_ua.decode_insn(insn, curr_addr)
disasm = ida_ua.print_insn_mnem(curr_addr)
if disasm is None:
break
if 'inc' in disasm:
# Preserve from instruction after int to start of end
bytes_start = ida_bytes.next_head(curr_addr, end)
if bytes_start == idc.BADADDR:
break
preserve_bytes = ida_bytes.get_bytes(bytes_start, end - bytes_start)
break
curr_addr = ida_bytes.prev_head(curr_addr, start)
return preserve_bytes
def build_near_jump(address, destination):
# Calculate the relative offset
offset = destination - address - 5
# Pack the offset into a little-endian byte string
jmp_bytes = struct.pack("<i", offset)
# Build the near jump instruction
jmp_instr = b"\xE9" + jmp_bytes
return jmp_instr
def build_near_je(address, destination):
# Calculate the relative offset
offset = destination - address - 6
# Pack the offset into a little-endian byte string
jmp_bytes = struct.pack("<i", offset)
# Build the near jump instruction
jmp_instr = b"\x0F\x84" + jmp_bytes
return jmp_instr
def patch_direct(start, end, destination):
# Get the bytes to preserve
preserve_bytes = get_preserve_bytes(start, end)
jmp_address = start + len(preserve_bytes)
# Build the near jump instruction
jmp_instr = build_near_jump(jmp_address, destination)
# Fill the rest of the instruction with NOPs
# We add 2 for the jmp reg at the end
nops = b"\x90" * (end - jmp_address - len(jmp_instr) + 2)
patch_bytes = preserve_bytes + jmp_instr + nops
if len(patch_bytes) != end - start + 2:
print(f"ERROR: Patch bytes length {len(patch_bytes)} does not match {end - start + 2}")
else:
# Apply the patch
ida_bytes.patch_bytes(start, patch_bytes)
reg_mov = {
"eax_eax": b'\x89\xC0',
"eax_ebx": b'\x89\xD8',
"eax_ecx": b'\x89\xC8',
"eax_edx": b'\x89\xD0',
"ebx_eax": b'\x89\xC3',
"ebx_ebx": b'\x89\xDB',
"ebx_ecx": b'\x89\xCB',
"ebx_edx": b'\x89\xD3',
"ecx_eax": b'\x89\xC1',
"ecx_ebx": b'\x89\xD9',
"ecx_ecx": b'\x89\xC9',
"ecx_edx": b'\x89\xD1',
"edx_eax": b'\x89\xC2',
"edx_ebx": b'\x89\xDA',
"edx_ecx": b'\x89\xCA',
"edx_edx": b'\x89\xD2',
}
reg_cmp = {
"eax" : b'\x3C\x01',
"ebx" : b'\x80\xFB\x01',
"ecx" : b'\x80\xF9\x01',
"edx" : b'\x80\xFA\x01',
}
def patch_indexed(start, end, reg, index_reg, dest_1, dest_0):
patch_byes = b""
# Get the safe reg to save condition
reg_mov_bytes = reg_mov[f"{reg}_{index_reg}"]
patch_byes += reg_mov_bytes
# Get the bytes to preserve
preserve_bytes = get_preserve_bytes(start, end)
patch_byes += preserve_bytes
# Get reg cmp bytes
cmp_bytes = reg_cmp[reg]
patch_byes += cmp_bytes
je_address = start + len(patch_byes)
# Build the near je instruction
je_instr = build_near_je(je_address, dest_1)
patch_byes += je_instr
# Build the near jmp instruction
jmp_address = start + len(patch_byes)
jmp_instr = build_near_jump(jmp_address, dest_0)
patch_byes += jmp_instr
# Fill the rest of the instruction with NOPs
# We add 2 for the jmp reg at the end
nops = b"\x90" * (end - jmp_address - len(jmp_instr) + 2)
patch_byes += nops
if len(patch_byes) != end - start + 2:
print(f"ERROR: Patch bytes length {len(patch_byes)} does not match {end - start + 2}")
else:
# Apply the patch
ida_bytes.patch_bytes(start, patch_byes)
# 0x434a30 we need to pick the further one
file_data = open("Z:\\tmp\\sample.bin", "rb").read()
#file_data = open("/tmp/sample.bin", "rb").read()
scan_binary(0x00433240, 0x00435149, file_data)
scan_binary(0x00435870, 0x00435A00 , file_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment