Skip to content

Instantly share code, notes, and snippets.

@heck-gd
Created October 13, 2023 11:51
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save heck-gd/b59b9c48c0bfe12229e363e7f5653e56 to your computer and use it in GitHub Desktop.
Save heck-gd/b59b9c48c0bfe12229e363e7f5653e56 to your computer and use it in GitHub Desktop.
CobaltStrike Volatility Config Extractor
from __future__ import annotations
import re
from itertools import cycle
MAX_SETTINGS = 128
def load_mapping(filename: str) -> dict[int, int]:
"""Processes textual Volatility memmap output into a page mapping."""
page_mapping = {}
with open(filename, 'r') as fp:
for line in fp.readlines():
if not line.startswith("0x"):
continue
cols = line.split("\t")
virt_addr = int(cols[0], 0)
size = int(cols[2], 0)
offset = int(cols[3], 0)
page_mapping[virt_addr] = offset
if size > 0x1000:
for i in range(virt_addr + 0x1000, virt_addr + size, 0x1000):
page_mapping[i] = offset + i - virt_addr
return page_mapping
def get_unpacked_config_address(cs_filename: str) -> int:
"""Gets unpacked config heap address from beacon dump."""
with open(cs_filename, 'rb') as fp:
data = fp.read()
match = re.search(rb"\x48\x8B\x0D(.{4})\x48\x0F\xBF\xFF\x8B\xD3\x0F\xB7\xF0\x48\xC1\xE7\x04\x66\x89\x1C\x0F", data) # noqa:E501
if not match:
raise Exception("Code pattern for config reference not found")
# Compute RIP-relative pointer to data section.
data_addr = int.from_bytes(match.group(1), 'little') + match.start() + 7
# Read qword in data section to get pointer to heap.
heap_addr = int.from_bytes(data[data_addr:data_addr+8], 'little')
return heap_addr
def decrypt(data: bytes, key: bytes) -> bytes:
"""Unmasks XOR with cyclic key."""
return bytes([b ^ k for b, k in zip(data, cycle(key))])
def _page(addr: int) -> int:
return addr & 0xFFFFFFFF_FFFFF000
def read_and_decrypt_heap(pages: dict[int, int], heap_addr: int, size: int,
dmp_filename: str, key: bytes) -> bytes:
"""Reads and unmasks heap data from the dump."""
page_addr = _page(heap_addr)
if page_addr not in pages:
raise Exception(f"Unable to find the requested page: {hex(page_addr)}")
with open(dmp_filename, 'rb') as dmp:
dmp.seek(pages[page_addr] + (heap_addr & 0xfff))
crypted_data = dmp.read(size)
return decrypt(crypted_data, key)
def retrieve_config(pages: dict[int, int], heap_addr: int, dmp_filename: str,
key: bytes) -> dict[int, tuple[int, bytes]]:
config_decrypted = read_and_decrypt_heap(pages, heap_addr, MAX_SETTINGS * 16, dmp_filename, key)
for i in range(10):
kind = int.from_bytes(config_decrypted[i:i+2], 'little')
if kind not in (0, 1, 2, 3):
print(config_decrypted.hex())
raise Exception(f"Config sanity check failed, got kind {hex(kind)} at index {i}")
config = {}
for i in range(MAX_SETTINGS):
offset = i * 16
kind = int.from_bytes(config_decrypted[offset:offset+2], 'little')
if kind == 0:
# Empty slot.
continue
data = int.from_bytes(config_decrypted[offset+8:offset+16], 'little')
if kind == 1: # WORD
data_bin = (data & 0xFFFF).to_bytes(2, 'big')
elif kind == 2: # DWORD
data_bin = (data & 0xFFFFFFFF).to_bytes(4, 'big')
elif kind == 3:
data_bin = read_and_decrypt_heap(pages, data, 512, dmp_filename, key)
# Consider value terminated after at least 10 zero chars.
zero_pos = data_bin.find(b"\x00" * 10)
if zero_pos > -1:
data_bin = data_bin[:zero_pos]
else:
print(f"[!!!] Encountered unknown kind {kind} at {i}")
continue
config[i] = (kind, data_bin)
return config
if __name__ == '__main__':
path_mapping = "/path/to/memmap.txt"
path_dmp = "/path/to/memmap.dmp"
path_cobalt = "/path/to/unmasked_beacon.dat"
key = bytes.fromhex("<your key as hex>")
pages = load_mapping(path_mapping)
heap_addr = get_unpacked_config_address(path_cobalt)
print(f"Found heap address {hex(heap_addr)}")
config = retrieve_config(pages, heap_addr, path_dmp, key)
print(config)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment