Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Parse CobaltStrike beacon metadata
import M2Crypto
import requests
PRIVATE_KEY_TEMPLATE = "-----BEGIN PRIVATE KEY-----\n{}\n-----END PRIVATE KEY-----"
PUBLIC_KEY_TEMPLATE = "-----BEGIN PUBLIC KEY-----\n{}\n-----END PUBLIC KEY-----"
class Metadata(object):
"""
Class to represent a beacon Metadata object
"""
def __init__(self, data="", private_key="", public_key="", cs_version=4):
self.cs_version = cs_version
self.data = data
self.public_key = public_key
self.private_key = private_key
self.port = 0
self.ciphertext = ""
self.charset = ""
self.charset_oem = ""
self.ver = ""
self.intz = ""
self.comp = ""
self.user = ""
self.pid = ""
self.bid = ""
self.barch = ""
self.raw_aes_keys = ""
self.aes_key = ""
self.hmac_key = ""
self.is64 = False
self.high_integrity = False
if data and len(data) != 128:
raise AttributeError('Metadata should be 128 bytes')
if data and private_key:
self.rsa_decrypt()
self.unpack()
def calculate_aes(self):
h = hashlib.sha256(self.raw_aes_keys)
digest = h.digest()
self.aes_key = digest[0:16]
self.hmac_key = digest[16:]
def rsa_decrypt(self):
pkey = M2Crypto.RSA.load_key_string(PRIVATE_KEY_TEMPLATE.format(self.private_key))
plaintext = pkey.private_decrypt(self.data, M2Crypto.RSA.pkcs1_padding)
assert plaintext[0:4] == '\x00\x00\xBE\xEF'
self.data = StringIO.StringIO(plaintext[8:])
def readInt(self, byteorder='>'):
fmt = byteorder + 'L'
return struct.unpack(fmt, self.data.read(struct.calcsize(fmt)))[0]
def readShort(self, byteorder='>'):
fmt = byteorder + 'H'
return struct.unpack(fmt, self.data.read(struct.calcsize(fmt)))[0]
def readByte(self):
fmt = 'b'
return struct.unpack(fmt, self.data.read(struct.calcsize(fmt)))[0]
def flag(self, b, s):
return b & s == s
def print_config(self):
print "raw AES key: %s" % self.raw_aes_keys[0:8].encode('hex')
print "raw HMAC key: %s" % self.raw_aes_keys[8:].encode('hex')
print "AES key: %s" % self.aes_key.encode('hex')
print "HMAC key: %s" % self.hmac_key.encode('hex')
print "ver: %s" % self.ver
print "host: %s" % self.intz
print "computer: %s" % self.comp
print "user: %s" % self.user
print "pid: %s" % self.pid
print "id: %s" % self.bid
print "barch: %s" % self.barch
print "is64: %s" % self.is64
if self.cs_version > 3:
print "charset: %s" % self.charset
print "port: %s" % self.port
def unpack(self):
self.data.seek(0)
self.raw_aes_keys = self.data.read(16)
self.calculate_aes()
if self.cs_version < 4:
config = self.data.read().split('\t')
self.bid = config[0]
self.pid = config[1]
self.ver = config[2]
self.intz = config[3]
self.comp = config[4]
self.user = config[5]
self.is64 = config[6]
if config[7] == '1':
self.barch = 'x64'
else:
self.barch = 'x86'
return
self.charset = self.readShort('<')
self.charset_oem = self.readShort('<')
self.bid = self.readInt()
self.pid = self.readInt()
self.port = self.readShort()
b = self.readByte()
if self.flag(b, 1):
self.barch = ""
self.pid = ""
self.is64 = ""
elif self.flag(b, 2):
self.barch = "x64"
else:
self.barch = "x86"
self.is64 = int(self.flag(b, 4))
self.high_integrity = self.flag(b, 8)
self.ver, self.intz, self.comp, self.user, self.proc = self.data.read().split('\t')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment