Skip to content

Instantly share code, notes, and snippets.

@calladoum-elastic
Last active October 30, 2023 23:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calladoum-elastic/3b733b023c237a6017b399d4c4f18d27 to your computer and use it in GitHub Desktop.
Save calladoum-elastic/3b733b023c237a6017b399d4c4f18d27 to your computer and use it in GitHub Desktop.
"""
Description:
From a user-mode memory dump, use BochsCPU to rebuild a functioning runtime context to execute a specific function (in our
case `cryptbase!SystemFunction036`).
Authors:
Christophe Alladoum, Elastic
Reference:
TODO Link to the blog post
"""
import ctypes
import logging
import os
import pathlib
import sys
import enum
import capstone
import udmp_parser
import bochscpu
import bochscpu.cpu
import bochscpu.memory
import bochscpu.utils
kernel32 = ctypes.windll.kernel32
kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]
kernel32.GetModuleHandleW.restype = ctypes.c_void_p
kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
kernel32.GetProcAddress.restype = ctypes.c_void_p
kernel32.LoadLibraryW.argtypes = [
ctypes.c_wchar_p,
]
kernel32.LoadLibraryW.restype = ctypes.c_void_p
PAGE_SIZE = bochscpu.utils.PAGE_SIZE
PA_START_ADDRESS = 0x100_0000
PML4_ADDRESS = 0x10_0000
MEM_FREE = 0x00010000
PAGE_NOACCESS = 0x01
end_address = -1
class Permission(enum.IntEnum):
CODE = 0
RW = 1
cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64)
def hexdump(
source: bytes, length: int = 0x10, separator: str = ".", base: int = 0x00
) -> str:
result: list[str] = []
align: int = length + 2
def chunk2hexstr(chunk: bytes):
return " ".join(map(lambda x: f"{x:02X}", chunk))
def chunk2ascii(chunk: bytes):
return "".join([chr(b) if 0x20 <= b < 0x7F else separator for b in chunk])
for i in range(0, len(source), length):
chunk = bytearray(source[i : i + length])
hexa = chunk2hexstr(chunk)
text = chunk2ascii(chunk)
result.append(f"{base + i:#0{align}x} {hexa:<{3 * length}} {text}")
return os.linesep.join(result)
def missing_page_cb(gpa):
raise Exception(f"missing_page_cb({gpa=:#x})")
def exception_cb(
sess: bochscpu.Session,
cpu_id: int,
vector: int,
error_code: int,
):
excpt = bochscpu.cpu.ExceptionType(vector)
match excpt:
case bochscpu.cpu.ExceptionType.BreakPoint:
logging.debug("breakpoint hit")
case bochscpu.cpu.ExceptionType.PageFault:
logging.debug(
f"pagefault on VA={sess.cpu.cr2:#016x} at IP={sess.cpu.rip:#016x}"
)
case _:
logging.debug(
f"cpu#{cpu_id} received exception({excpt=}, {error_code=:d}) "
)
sess.stop()
def before_execution_cb(sess: bochscpu.Session, cpu_id: int, _: int):
# state = sess.cpu.state
# raw = bytes(bochscpu.memory.virt_read(PML4_ADDRESS, state.rip, 16))
# insn = next(cs.disasm(raw, state.rip))
# logging.debug(
# f"[CPU#{cpu_id}] PC={state.rip:#x} {insn.bytes.hex()} - {insn.mnemonic} {insn.op_str}"
# )
global end_address
if sess.cpu.rip == end_address:
logging.debug(f"Stopping execution at {end_address:#X}")
sess.stop()
def convert_region_protection(protect: int) -> int:
match protect:
case 0x02: # PAGE_READONLY
return Permission.RW
case 0x04: # PAGE_READWRITE
return Permission.RW
case 0x08: # PAGE_WRITECOPY
return Permission.RW
case 0x10: # PAGE_EXECUTE
return Permission.CODE
case 0x20: # PAGE_EXECUTE_READ
return Permission.CODE
case 0x40: # PAGE_EXECUTE_READWRITE
return Permission.CODE
case 0x80: # PAGE_EXECUTE_WRITECOPY
return Permission.RW
# logging.warning(f"Unknown {protect=:#x})")
return -1
def switch_to_thread(state: bochscpu.State, thread: udmp_parser.Thread):
assert isinstance(thread.Context, udmp_parser.Context64)
#
# AMD Vol2 - A.1 System Software MSRs
#
FSBase = 0xC000_0100
GSBase = 0xC000_0101
KernelGSBase = 0xC000_0102
logging.debug(f"Switching context to {thread}")
_cs = bochscpu.Segment()
_cs.base = 0
_cs.limit = 0xFFFF_FFFF
_cs.selector = thread.Context.SegCs
_cs.attr = 0x22FB
_cs.present = True
_ds = bochscpu.Segment()
_ds.base = 0
_ds.limit = 0xFFFF_FFFF
_ds.selector = thread.Context.SegDs
_ds.attr = 0xCF3
_ds.present = True
_es = bochscpu.Segment()
_es.base = 0
_es.limit = 0xFFFF_FFFF
_es.selector = thread.Context.SegEs
_es.attr = 0xCF3
_ss = bochscpu.Segment()
_ss.base = 0
_ss.limit = 0xFFFF_FFFF
_ss.selector = thread.Context.SegSs
_ss.attr = 0xCF3
# AMD Vol2 - 4.5.3
# > In 64-bit mode, FS-segment and GS-segment overrides are not checked for limit or attributes. Instead,
# > the processor checks that all virtual-address references are in canonical form
_fs = bochscpu.Segment()
_fs.base = 0
_fs.limit = 0xFFFF_FFFF
_fs.selector = thread.Context.SegFs
_fs.present = True
_fs.attr = 0xCF3
_gs = bochscpu.Segment()
_gs.base = thread.Teb
_gs.limit = 0x0000_0FFF
_gs.selector = thread.Context.SegGs
_gs.present = True
_gs.attr = 0x4F3
state.ss = _ss
state.cs = _cs
state.ds = _ds
state.es = _es
state.fs = _fs
state.gs = _gs
state.rip = thread.Context.Rip
state.rsp = thread.Context.Rsp
return
def call_function(
sess: bochscpu.Session,
start_address: int,
end_addr: int,
args: list[int],
) -> None:
global end_address
state = sess.cpu.state
state.rip = start_address
if len(args) >= 1:
state.rcx = args[0]
if len(args) >= 2:
state.rdx = args[1]
if len(args) >= 3:
state.r8 = args[2]
if len(args) >= 4:
state.r9 = args[3]
# logging.debug("Preparing hooks")
hook = bochscpu.Hook()
hook.exception = exception_cb
hook.before_execution = before_execution_cb
end_address = end_addr
logging.debug("Preparing emulation environment")
sess.cpu.state = state
logging.debug("Dumping initial register state")
if logging.getLogger().isEnabledFor(logging.DEBUG):
bochscpu.utils.dump_registers(sess.cpu.state)
logging.debug("Start emulation")
sess.run(
[
hook,
]
)
logging.debug("Dumping final register state")
if logging.getLogger().isEnabledFor(logging.DEBUG):
bochscpu.utils.dump_registers(sess.cpu.state)
return
def resolve_function(symbol: str) -> int:
dll, func = symbol.split("!", 1)
if not dll.lower().endswith(".dll"):
dll += ".dll"
logging.debug(f"Looking up {func} in {dll}")
handle = kernel32.LoadLibraryW(dll)
if not handle:
raise RuntimeError(f"Failed to get a handle to '{dll}'")
address: int = kernel32.GetProcAddress(handle, func.encode())
if not address:
raise RuntimeError(f"Failed to resolve '{symbol}'")
logging.info(f"Resolved '{symbol:s}' -> {address:#x}")
return address
def emulate(dmp):
dmp_path = pathlib.Path(dmp)
logging.info(f"Parsing {dmp_path}")
dmp = udmp_parser.UserDumpParser()
assert dmp.Parse(dmp_path)
logging.info(f"Successfully parsed {dmp_path}")
sess = bochscpu.Session()
sess.missing_page_handler = missing_page_cb
logging.debug("Preparing page table")
pt = bochscpu.memory.PageMapLevel4Table()
pa = PA_START_ADDRESS
pgnb = 0
for _, region in dmp.Memory().items():
# logging.debug(f"mapping {region=}")
if region.State == MEM_FREE or region.Protect == PAGE_NOACCESS:
continue
start, end = region.BaseAddress, region.BaseAddress + region.RegionSize
for va in range(start, end, PAGE_SIZE):
flags = convert_region_protection(region.Protect)
if flags < 0:
continue
pt.insert(va, pa, flags)
assert pt.translate(va) == pa
hva = bochscpu.memory.allocate_host_page()
bochscpu.memory.page_insert(pa, hva)
print(f"\bmapped {va=:#x} to {pa=:#x} with {flags=}\r", end="")
pa += PAGE_SIZE
pgnb += 1
logging.debug(f"{pgnb} pages inserted")
buffer_hva = bochscpu.memory.allocate_host_page()
buffer_pa = 0x4100_0000
buffer_va = 0x41_0000_0000
pt.insert(buffer_va, buffer_pa, Permission.RW)
bochscpu.memory.page_insert(buffer_pa, buffer_hva)
stack_hva = bochscpu.memory.allocate_host_page()
stack_pa = 0x4200_0000
stack_va = 0x42_0000_0000
pt.insert(stack_va, stack_pa, Permission.RW)
bochscpu.memory.page_insert(stack_pa, stack_hva)
logging.debug(f"Committing {pgnb} pages")
layout = pt.commit(PML4_ADDRESS)
for hva, gpa in layout:
bochscpu.memory.page_insert(gpa, hva)
evaled_hva = bochscpu.memory.phy_translate(gpa)
assert evaled_hva == hva, f"{evaled_hva=:#x} == {hva=:#x}"
# print(f"mapped {gpa=:#x} to {hva=:#x}\r", end="")
# bochscpu.utils.dump_page_table(PML4_ADDRESS)
logging.debug("Copy memory content")
for _, region in dmp.Memory().items():
if region.State == MEM_FREE or region.AllocationProtect == PAGE_NOACCESS:
continue
start, end = region.BaseAddress, region.BaseAddress + region.RegionSize
content = dmp.ReadMemory(start, end)
assert content is not None
content = bytes(content)
bochscpu.memory.virt_write(PML4_ADDRESS, start, bytes(content))
del content
logging.debug("Preparing CPU state")
state = bochscpu.State()
bochscpu.cpu.set_long_mode(state)
logging.debug("Enabling MMX (SSE/AVX) instructions")
cr0 = bochscpu.utils.cpu.CR0(state.cr0)
cr4 = bochscpu.utils.cpu.CR4(state.cr4)
xcr0 = bochscpu.utils.cpu.XCR0(state.xcr0)
# See AMD Vol2 - 11.3
cr0.MP = True
cr0.EM = False
cr4.OSFXSR = True
cr4.OSXSAVE = True
# See AMD Vol2 - 11.5.2
xcr0.x87 = True
xcr0.SSE = True
xcr0.YMM = True
logging.debug(f"Setting {cr0=:}")
logging.debug(f"Setting {cr4=:}")
logging.debug(f"Setting {xcr0=:}")
state.cr0 = int(cr0)
state.cr4 = int(cr4)
state.xcr0 = int(xcr0)
state.lstar = 0x41414141_41410000 # force a crash on syscalls as a way to stop execution
logging.debug(f"Setting PML4 to {PML4_ADDRESS:#x}")
state.cr3 = PML4_ADDRESS
threads = dmp.Threads()
tids = list(threads.keys())
switch_to_thread(state, threads[tids[0]])
sess.cpu.state = state
fn_sym = "cryptbase!SystemFunction036"
logging.debug(f"Resolving '{fn_sym}'")
fn_start = resolve_function(fn_sym)
fn_end = fn_start + 0x1C
logging.info(f"{fn_sym} found at {fn_start:#x}")
for _ in range(10):
call_function(sess, fn_start, fn_end, [buffer_va, 16])
data = bytes(bochscpu.memory.virt_read(PML4_ADDRESS, buffer_va, 0x10))
print(hexdump(data))
bochscpu.memory.release_host_page(stack_pa)
bochscpu.memory.release_host_page(buffer_pa)
if __name__ == "__main__":
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
emulate(sys.argv[1])
"""
Description:
Retrieve session key for NotPetya, given a specific memory dump
Authors:
Salim Bitam, Elastic
Christophe Alladoum, Elastic
Reference:
TODO Link to the blog post
"""
import argparse
import struct
import udmp_parser
import logging
import sys
import pathlib
from typing import Optional
from binascii import hexlify
logging.getLogger().setLevel(logging.INFO)
def p32(a) -> bytes:
return struct.pack("<I", a)
def u32(a) -> int:
return struct.unpack("<I", a)[0]
def read_memory(dmp: udmp_parser.UserDumpParser, addr: int, size: int) -> bytearray:
data = dmp.ReadMemory(addr, size)
assert data
return bytearray(data)
def previous_frame(dmp: udmp_parser.UserDumpParser, current_ebp: int, depth: int):
while depth > 0:
previous_ebp_addr = read_memory(
dmp, addr=current_ebp, size=4
) # change to right size
previous_ebp = u32(previous_ebp_addr[0:4])
current_ebp = previous_ebp
depth -= 1
return previous_ebp
def parse_dump(minidumpfile: str, thread_id: int) -> Optional[bytearray]:
dmp = udmp_parser.UserDumpParser()
assert dmp.Parse(minidumpfile)
#
# Find the right thread
#
threads: list[udmp_parser.Thread_t] = dmp.Threads()
thread: Optional[udmp_parser.Thread_t] = None
if thread_id not in threads:
logging.error(f"Cannot find thread {thread_id}")
return
thread = threads[thread_id]
assert thread is not None
logging.info(f"Found {thread=}")
#
# Go to previous frame
#
ebp = previous_frame(dmp, current_ebp=thread.Context.Rbp, depth=2)
#
# Retrieve the session context
#
custom_struct_addr = read_memory(dmp, addr=ebp + 3 * 4, size=4)
custom_struct_addr = u32(custom_struct_addr[0:4])
custom_struct = read_memory(dmp, addr=custom_struct_addr, size=0x20)
logging.info(f"session context is at {custom_struct_addr:#x}")
#
# Get the `hcryptkey` at offset 0x14
#
hcryptkey_struct_addr = u32(custom_struct[0x14:0x18])
hcryptkey_struct = read_memory(dmp, addr=hcryptkey_struct_addr, size=128)
logging.info(f"hCryptContext is at {hcryptkey_struct_addr:#x}")
#
# Decode the AES structure pointer
#
magic_s_addr = u32(hcryptkey_struct[0x2C:0x30]) ^ 0xE35A172C
magic_s_struct = read_memory(dmp, addr=magic_s_addr, size=128)
key_data_s_struct_addr = u32(magic_s_struct[0x00:0x04])
key_data_s_struct = read_memory(dmp, addr=key_data_s_struct_addr, size=128)
logging.info(f"AES structure is at {key_data_s_struct_addr:#x}")
#
# Finally extract the AES-CBC key
#
aes_key_addr = u32(key_data_s_struct[0x10:0x14])
aes_key = read_memory(dmp, addr=aes_key_addr, size=16)
return aes_key
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="NotPetya dump extractor/decryptor based on minidumps (onweek POC)"
)
parser.add_argument(
"minidumpfile", help="path to the minidump file", type=pathlib.Path
)
parser.add_argument(
"--thread-id",
dest="thread_id",
help="Thread ID that triggered the alert",
type=lambda x: int(x, 0),
)
args = parser.parse_args()
aes_key = parse_dump(args.minidumpfile, args.thread_id)
if not aes_key:
logging.error("Failed to retrieve the key")
sys.exit(1)
logging.info(f"AES key: {hexlify(aes_key).decode()}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment