Created
February 5, 2024 22:20
-
-
Save Donaldduck8/5eaae8a8c4541d088427c5bebfca14c6 to your computer and use it in GitHub Desktop.
String decryptor for unknown loader written in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gc | |
import json | |
import yara | |
import binascii | |
import pefile | |
import capstone | |
import traceback | |
import unicorn | |
import unicorn.x86_const | |
from capstone import Cs, CS_ARCH_X86, CS_MODE_64, CsInsn | |
sample_p = r"E:\Malware\Projects\go_loader\4f77e46d5b800f9df91dcaa97371e9231933ceb333b361b0c5c3d38fd4eec36a" | |
sample_b = open(sample_p, "rb").read() | |
cs = Cs(CS_ARCH_X86, CS_MODE_64) | |
cs.skipdata = True | |
sample_pe = pefile.PE(sample_p) | |
sample_image_base = sample_pe.OPTIONAL_HEADER.ImageBase | |
def disassemble_blob(cs: Cs, bytes_blob: bytes) -> list[CsInsn]: | |
return list(cs.disasm(bytes_blob, 0x0)) | |
def get_section(pe: pefile.PE, section_name: str): | |
for section in pe.sections: | |
if bytearray(section_name, encoding="latin-1") in section.Name: | |
return section | |
return None | |
def find_runtime_byteslicetostring(data) -> int: | |
# Define your YARA rule as a string | |
yara_rule = """ | |
rule generated_rule | |
{ | |
strings: | |
$chunk_1 = { | |
48 83 F? 01 | |
7? ?? | |
[0-150] | |
48 85 C? | |
74 ?? | |
48 83 F? 20 | |
7? ?? | |
[10-150] | |
E9 ?? ?? ?? ?? | |
CC CC CC CC | |
} | |
condition: | |
any of them | |
} | |
""" | |
compiled_rule = yara.compile(source=yara_rule) | |
matches = compiled_rule.match(data=data) | |
# Hopefully only one match | |
offset = matches[0].strings[0][0] | |
# Need to keep reading backwards to reach the start of the function | |
while data[offset] != ord("\xCC"): | |
offset -= 1 | |
return offset + 1 | |
def find_xor_strings(mpress_segment: pefile.SectionStructure, instructions: list[CsInsn], runtime_func_rva: int): | |
for i, insn in enumerate(instructions): | |
if insn.mnemonic != "call": | |
continue | |
try: | |
called_func_rva = int(insn.op_str, 16) | |
except: | |
continue | |
if called_func_rva == runtime_func_rva: | |
yield i | |
def decrypt_xor_string(lookback_buffer: list[CsInsn]): | |
# Alright, we are probably looking at a XOR string! | |
print(lookback_buffer[3]) | |
print(lookback_buffer[2]) | |
print(lookback_buffer[1]) | |
print(lookback_buffer[0]) | |
# Walk backwards to the last mov instruction (string length) | |
for mov_index, mov_insn in enumerate(lookback_buffer): | |
if mov_insn.mnemonic == "mov": | |
break | |
string_length_insn = disassemble_blob(cs, mov_insn.bytes)[0] | |
string_length = string_length_insn.operands[1].imm | |
print("Length", hex(string_length)) | |
# Walk backwards to the first lea instruction (decrypted_string_offset) | |
for lea_index, lea_insn in enumerate(lookback_buffer[mov_index:], start=mov_index): | |
if lea_insn.mnemonic == "lea": | |
break | |
decrypted_string_disp_insn = disassemble_blob(cs, lea_insn.bytes)[0] | |
decrypted_string_disp = decrypted_string_disp_insn.operands[1].mem.disp | |
print("Decrypted String Displacement", hex(decrypted_string_disp)) | |
decrypted_string_init_insn = None | |
# Walk backwards until a mov instruction moves 0 into this displacement | |
# Alternatively, a movups instruction that moves an xmm register into this displacement | |
for mov_index, mov_insn in enumerate(lookback_buffer[lea_index:], start=lea_index): | |
if "mov" not in mov_insn.mnemonic: | |
continue | |
mov_insn_detail = disassemble_blob(cs, mov_insn.bytes)[0] | |
destination = mov_insn_detail.operands[0] | |
if destination.type != capstone.x86.X86_OP_MEM: | |
continue | |
destination_disp = destination.mem.disp | |
if destination_disp != decrypted_string_disp: | |
continue | |
source = mov_insn_detail.operands[1] | |
if mov_insn.mnemonic == "movups": | |
if source.type != capstone.x86.X86_OP_REG: | |
continue | |
decrypted_string_init_insn = mov_insn | |
break | |
else: | |
if source.type != capstone.x86.X86_OP_IMM: | |
continue | |
source_immediate = source.imm | |
if source_immediate == 0: | |
decrypted_string_init_insn = mov_insn | |
break | |
if not decrypted_string_init_insn: | |
print("Didn't find init instruction") | |
print() | |
return None, None | |
# There's typically 1-2 instructions of garbage inbetween the decrypted string init and the one time pads, so let's get past those | |
for trash_index, trash_insn in enumerate(lookback_buffer[mov_index + 1:], start=mov_index + 1): | |
if trash_insn.mnemonic not in ["mov", "movabs"]: | |
continue | |
break | |
for anything_else_index, anything_else_insn in enumerate(lookback_buffer[trash_index:], start=trash_index): | |
if "mov" not in anything_else_insn.mnemonic: | |
break | |
anything_else_insn_detail = disassemble_blob(cs, anything_else_insn.bytes)[0] | |
if anything_else_insn_detail.operands[-1].type == capstone.x86.X86_OP_MEM: | |
break | |
destination = anything_else_insn_detail.operands[0] | |
if destination.type == capstone.x86.X86_OP_MEM: | |
if destination.mem.base != capstone.x86.X86_REG_RSP: | |
break | |
# The relevant stack strings are pushed here: lookback_buffer[mov_index : anything_else_index] | |
stack_string_setup = list(reversed(lookback_buffer[mov_index : anything_else_index])) | |
for insn in stack_string_setup: | |
print(insn) | |
string = extract_string(stack_string_setup, decrypted_string_disp=decrypted_string_disp, string_length=string_length) | |
print(string) | |
print() | |
if string.isprintable(): | |
return lookback_buffer[0].address, string | |
else: | |
return None, None | |
def extract_string(instructions, decrypted_string_disp, string_length): | |
# Initialize Unicorn engine | |
mu = unicorn.Uc(unicorn.UC_ARCH_X86, unicorn.UC_MODE_64) | |
# Set up memory for the emulation | |
TEXT = 0x100000 | |
TEXT_SIZE = 0x1000 | |
STACK = 0x200000 | |
STACK_SIZE = 0x10000 | |
mu.mem_map(TEXT, TEXT_SIZE) | |
mu.mem_map(STACK, STACK_SIZE) | |
# Write instructions to memory | |
instruction_bytes = b''.join([insn.bytes for insn in instructions]) | |
mu.mem_write(TEXT, instruction_bytes) | |
# Set up RSP | |
mu.reg_write(unicorn.x86_const.UC_X86_REG_RSP, STACK) | |
try: | |
mu.emu_start(TEXT, TEXT + len(instruction_bytes), count=len(instructions)) | |
except unicorn.UcError as e: | |
print(f"Emulation error: {e}") | |
print(hex(mu.reg_read(unicorn.x86_const.UC_X86_REG_RIP))) | |
# Try pads both below and above the decrypted string on the stack | |
pad_1 = mu.mem_read(STACK + (decrypted_string_disp - string_length), string_length) | |
pad_2 = mu.mem_read(STACK + (decrypted_string_disp - string_length * 2), string_length) | |
string = "".join([chr(x ^ y) for (x,y) in zip(pad_1, pad_2)]) | |
if not string.isprintable() and string.isascii(): | |
pad_1 = mu.mem_read(STACK + (decrypted_string_disp + string_length), string_length) | |
pad_2 = mu.mem_read(STACK + (decrypted_string_disp + string_length * 2), string_length) | |
string = "".join([chr(x ^ y) for (x,y) in zip(pad_1, pad_2)]) | |
# Clean up | |
mu.mem_unmap(TEXT, TEXT_SIZE) | |
mu.mem_unmap(STACK, STACK_SIZE) | |
mu.emu_stop() | |
gc.collect() | |
return string | |
def main(): | |
mpress_segment = get_section(sample_pe, ".MPRESS1") | |
mpress_segment_data = mpress_segment.get_data() | |
runtime_func_addr = find_runtime_byteslicetostring(mpress_segment_data) | |
test_data = mpress_segment_data[runtime_func_addr : runtime_func_addr + 256] | |
print(hex(runtime_func_addr)) | |
print(binascii.hexlify(test_data)) | |
instructions = disassemble_blob(cs, mpress_segment_data) | |
results = {} | |
cs.detail = True | |
for index in find_xor_strings(mpress_segment, instructions, runtime_func_addr): | |
lookback_buffer = list(reversed(instructions[index - 1024 : index + 1])) | |
try: | |
address, decrypted_string = decrypt_xor_string(lookback_buffer=lookback_buffer) | |
if decrypted_string != None: | |
results[address] = decrypted_string | |
except: | |
traceback.print_exc() | |
for addr2, string in results.items(): | |
print(hex(addr2), string) | |
cs.detail = False | |
# Create a JSON dump of the results for StrAnnotate | |
dump = {"strings": [{"offset": addr3 + 0x1000, "value": value} for addr3, value in results.items()]} | |
print(json.dumps(dump)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment