Skip to content

Instantly share code, notes, and snippets.

@Donaldduck8
Created February 5, 2024 22:20
Show Gist options
  • Save Donaldduck8/5eaae8a8c4541d088427c5bebfca14c6 to your computer and use it in GitHub Desktop.
Save Donaldduck8/5eaae8a8c4541d088427c5bebfca14c6 to your computer and use it in GitHub Desktop.
String decryptor for unknown loader written in Go
import gc
import json
import yara
import binascii
import pefile
import capstone
import traceback
import unicorn
import unicorn.x86_const
from capstone import Cs, CS_ARCH_X86, CS_MODE_64, CsInsn
sample_p = r"E:\Malware\Projects\go_loader\4f77e46d5b800f9df91dcaa97371e9231933ceb333b361b0c5c3d38fd4eec36a"
sample_b = open(sample_p, "rb").read()
cs = Cs(CS_ARCH_X86, CS_MODE_64)
cs.skipdata = True
sample_pe = pefile.PE(sample_p)
sample_image_base = sample_pe.OPTIONAL_HEADER.ImageBase
def disassemble_blob(cs: Cs, bytes_blob: bytes) -> list[CsInsn]:
return list(cs.disasm(bytes_blob, 0x0))
def get_section(pe: pefile.PE, section_name: str):
for section in pe.sections:
if bytearray(section_name, encoding="latin-1") in section.Name:
return section
return None
def find_runtime_byteslicetostring(data) -> int:
# Define your YARA rule as a string
yara_rule = """
rule generated_rule
{
strings:
$chunk_1 = {
48 83 F? 01
7? ??
[0-150]
48 85 C?
74 ??
48 83 F? 20
7? ??
[10-150]
E9 ?? ?? ?? ??
CC CC CC CC
}
condition:
any of them
}
"""
compiled_rule = yara.compile(source=yara_rule)
matches = compiled_rule.match(data=data)
# Hopefully only one match
offset = matches[0].strings[0][0]
# Need to keep reading backwards to reach the start of the function
while data[offset] != ord("\xCC"):
offset -= 1
return offset + 1
def find_xor_strings(mpress_segment: pefile.SectionStructure, instructions: list[CsInsn], runtime_func_rva: int):
for i, insn in enumerate(instructions):
if insn.mnemonic != "call":
continue
try:
called_func_rva = int(insn.op_str, 16)
except:
continue
if called_func_rva == runtime_func_rva:
yield i
def decrypt_xor_string(lookback_buffer: list[CsInsn]):
# Alright, we are probably looking at a XOR string!
print(lookback_buffer[3])
print(lookback_buffer[2])
print(lookback_buffer[1])
print(lookback_buffer[0])
# Walk backwards to the last mov instruction (string length)
for mov_index, mov_insn in enumerate(lookback_buffer):
if mov_insn.mnemonic == "mov":
break
string_length_insn = disassemble_blob(cs, mov_insn.bytes)[0]
string_length = string_length_insn.operands[1].imm
print("Length", hex(string_length))
# Walk backwards to the first lea instruction (decrypted_string_offset)
for lea_index, lea_insn in enumerate(lookback_buffer[mov_index:], start=mov_index):
if lea_insn.mnemonic == "lea":
break
decrypted_string_disp_insn = disassemble_blob(cs, lea_insn.bytes)[0]
decrypted_string_disp = decrypted_string_disp_insn.operands[1].mem.disp
print("Decrypted String Displacement", hex(decrypted_string_disp))
decrypted_string_init_insn = None
# Walk backwards until a mov instruction moves 0 into this displacement
# Alternatively, a movups instruction that moves an xmm register into this displacement
for mov_index, mov_insn in enumerate(lookback_buffer[lea_index:], start=lea_index):
if "mov" not in mov_insn.mnemonic:
continue
mov_insn_detail = disassemble_blob(cs, mov_insn.bytes)[0]
destination = mov_insn_detail.operands[0]
if destination.type != capstone.x86.X86_OP_MEM:
continue
destination_disp = destination.mem.disp
if destination_disp != decrypted_string_disp:
continue
source = mov_insn_detail.operands[1]
if mov_insn.mnemonic == "movups":
if source.type != capstone.x86.X86_OP_REG:
continue
decrypted_string_init_insn = mov_insn
break
else:
if source.type != capstone.x86.X86_OP_IMM:
continue
source_immediate = source.imm
if source_immediate == 0:
decrypted_string_init_insn = mov_insn
break
if not decrypted_string_init_insn:
print("Didn't find init instruction")
print()
return None, None
# There's typically 1-2 instructions of garbage inbetween the decrypted string init and the one time pads, so let's get past those
for trash_index, trash_insn in enumerate(lookback_buffer[mov_index + 1:], start=mov_index + 1):
if trash_insn.mnemonic not in ["mov", "movabs"]:
continue
break
for anything_else_index, anything_else_insn in enumerate(lookback_buffer[trash_index:], start=trash_index):
if "mov" not in anything_else_insn.mnemonic:
break
anything_else_insn_detail = disassemble_blob(cs, anything_else_insn.bytes)[0]
if anything_else_insn_detail.operands[-1].type == capstone.x86.X86_OP_MEM:
break
destination = anything_else_insn_detail.operands[0]
if destination.type == capstone.x86.X86_OP_MEM:
if destination.mem.base != capstone.x86.X86_REG_RSP:
break
# The relevant stack strings are pushed here: lookback_buffer[mov_index : anything_else_index]
stack_string_setup = list(reversed(lookback_buffer[mov_index : anything_else_index]))
for insn in stack_string_setup:
print(insn)
string = extract_string(stack_string_setup, decrypted_string_disp=decrypted_string_disp, string_length=string_length)
print(string)
print()
if string.isprintable():
return lookback_buffer[0].address, string
else:
return None, None
def extract_string(instructions, decrypted_string_disp, string_length):
# Initialize Unicorn engine
mu = unicorn.Uc(unicorn.UC_ARCH_X86, unicorn.UC_MODE_64)
# Set up memory for the emulation
TEXT = 0x100000
TEXT_SIZE = 0x1000
STACK = 0x200000
STACK_SIZE = 0x10000
mu.mem_map(TEXT, TEXT_SIZE)
mu.mem_map(STACK, STACK_SIZE)
# Write instructions to memory
instruction_bytes = b''.join([insn.bytes for insn in instructions])
mu.mem_write(TEXT, instruction_bytes)
# Set up RSP
mu.reg_write(unicorn.x86_const.UC_X86_REG_RSP, STACK)
try:
mu.emu_start(TEXT, TEXT + len(instruction_bytes), count=len(instructions))
except unicorn.UcError as e:
print(f"Emulation error: {e}")
print(hex(mu.reg_read(unicorn.x86_const.UC_X86_REG_RIP)))
# Try pads both below and above the decrypted string on the stack
pad_1 = mu.mem_read(STACK + (decrypted_string_disp - string_length), string_length)
pad_2 = mu.mem_read(STACK + (decrypted_string_disp - string_length * 2), string_length)
string = "".join([chr(x ^ y) for (x,y) in zip(pad_1, pad_2)])
if not string.isprintable() and string.isascii():
pad_1 = mu.mem_read(STACK + (decrypted_string_disp + string_length), string_length)
pad_2 = mu.mem_read(STACK + (decrypted_string_disp + string_length * 2), string_length)
string = "".join([chr(x ^ y) for (x,y) in zip(pad_1, pad_2)])
# Clean up
mu.mem_unmap(TEXT, TEXT_SIZE)
mu.mem_unmap(STACK, STACK_SIZE)
mu.emu_stop()
gc.collect()
return string
def main():
mpress_segment = get_section(sample_pe, ".MPRESS1")
mpress_segment_data = mpress_segment.get_data()
runtime_func_addr = find_runtime_byteslicetostring(mpress_segment_data)
test_data = mpress_segment_data[runtime_func_addr : runtime_func_addr + 256]
print(hex(runtime_func_addr))
print(binascii.hexlify(test_data))
instructions = disassemble_blob(cs, mpress_segment_data)
results = {}
cs.detail = True
for index in find_xor_strings(mpress_segment, instructions, runtime_func_addr):
lookback_buffer = list(reversed(instructions[index - 1024 : index + 1]))
try:
address, decrypted_string = decrypt_xor_string(lookback_buffer=lookback_buffer)
if decrypted_string != None:
results[address] = decrypted_string
except:
traceback.print_exc()
for addr2, string in results.items():
print(hex(addr2), string)
cs.detail = False
# Create a JSON dump of the results for StrAnnotate
dump = {"strings": [{"offset": addr3 + 0x1000, "value": value} for addr3, value in results.items()]}
print(json.dumps(dump))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment