Skip to content

Instantly share code, notes, and snippets.

@biemster
Forked from williballenthin/macOS_keychain.py
Created September 6, 2022 19:33
Show Gist options
  • Save biemster/3f8fca94a457f9ae830cdfadd5a6730c to your computer and use it in GitHub Desktop.
Save biemster/3f8fca94a457f9ae830cdfadd5a6730c to your computer and use it in GitHub Desktop.
bling.py - extract keys from macOS keychains.
#!/usr/bin/env python3
'''
bling.py - extract keys from macOS keychains.
installation:
pip install pytz hexdump vivisect-vstruct-wb tabulate argparse pycryptodome
usage:
python bling.py /path/to/keychain-db <password> ./path/to/output/directory
references:
- https://repo.zenk-security.com/Forensic/Keychain%20Analysis%20with%20Mac%20OS%20X%20Memory%20Forensics.pdf
- https://github.com/libyal/dtformats/blob/master/documentation/MacOS%20keychain%20database%20file%20format.asciidoc
author: Willi Ballenthin
email: william.ballenthin@fireeye.com
license: Apache 2.0
'''
# TODO: detect invalid password
import os
import os.path
import sys
import copy
import string
import hashlib
import logging
import binascii
import datetime
import itertools
from pprint import pprint
import pytz
import hexdump
import vstruct
from vstruct.primitives import *
import tabulate
import argparse
# from pycryptodome
from Crypto.Cipher import DES3
from Crypto.Util.Padding import unpad
import Crypto.Protocol.KDF
logger = logging.getLogger('osx.bling')
class v_greedy_bytes(v_bytes):
'''
a v_bytes byte array that consumes to the end of the given buffer.
'''
def vsParse(self, fbytes, offset=0):
self._vs_value = fbytes[offset:]
return len(fbytes)
class RECORD_HEADER(vstruct.VStruct):
def __init__(self, attrs):
vstruct.VStruct.__init__(self)
self.RecordSize = v_uint32(bigend=True)
self.RecordNumber = v_uint32(bigend=True)
self.unk1 = v_uint32(bigend=True)
self.unk2 = v_uint32(bigend=True)
self.BlobSize = v_uint32(bigend=True)
self.zero = v_uint32(bigend=True)
# offset 0x18
self.AttributeOffsets = vstruct.VArray([v_uint32(bigend=True) for _ in range(len(attrs))])
self.blob_data_offset = 0x18 + (4 * len(attrs))
self.BlobData = v_bytes(size=0)
self.attribute_data_offset = self.blob_data_offset
self.AttributeData = v_bytes(size=0)
def pcb_BlobSize(self):
self['BlobData'].vsSetLength(int(self.BlobSize))
self['AttributeData'].vsSetLength(int(self.RecordSize) - self.blob_data_offset - int(self.BlobSize))
self.attribute_data_offset = self.blob_data_offset + int(self.BlobSize)
class Record:
def __init__(self, schema, buf):
'''
Args:
schema: from `Keychain.get_table_schema()`.
buf (bytes): buffer to parse for a record.
'''
#
# diagram:
#
# +-----------+-----------+-----------+-----------+
# | rec size rec index unk1 unk2 |
# +-----------+-----------+-----------+-----------+
# | blob size 0x0 | attribute offsets | <-- attributes are declared in schema,
# +-----------+-----------+ | record structure based on table.
# | |
# +-----------+-----------+-----------+-----------+
# | blob data (parsed into "blob") | \
# | | > blob size
# | | /
# +-----------+-----------+-----------+-----------+
# | attribute data |
# | |
# | |
# +-----------+-----------+-----------+-----------+ <-- rec size
#
self.buf = buf
# this is the generic header, contains record size, record number, etc.
self.header = RECORD_HEADER(schema['attrs'])
#print(len(schema['attrs']))
#hexdump.hexdump(buf[:0x100])
self.header.vsParse(buf)
self.attrs = {}
for i, attr_desc in enumerate(schema['attrs']):
attr_offset = int(self.header.AttributeOffsets[i])
if attr_offset != 0:
# offset == 0 signals the attribute is empty
# so shift offsets by 1.
#
# this offset is relative to the start of the record.
attr_offset = attr_offset - 1
attr_buf = buf[attr_offset:]
attr = ATTRIBUTE_PARSERS[int(attr_desc['AttributeFormat'])]()
attr.vsParse(attr_buf)
self.attrs[str(attr_desc['AttributeName'])] = attr
self.blob = BLOB_PARSERS[int(schema['RelationID'])]()
self.blob.vsParse(self.header.BlobData)
CSSM_DL_DB = v_enum()
# Schema Management
CSSM_DL_DB.SCHEMA_INFO = 0x00000000 # Schema information
CSSM_DL_DB.SCHEMA_INDEXES = 0x00000001 # Schema indexes
CSSM_DL_DB.SCHEMA_ATTRIBUTES = 0x00000002 # Schema attributes
CSSM_DL_DB.SCHEMA_PARSING_MODULE = 0x00000003 # Schema parsing module
# Open Group Application
CSSM_DL_DB.RECORD_ANY = 0x0000000A # Temporary table type.
CSSM_DL_DB.RECORD_CERT = 0x0000000B # Certificates
CSSM_DL_DB.RECORD_CRL = 0x0000000C # Certificate Revocation List
CSSM_DL_DB.RECORD_POLICY = 0x0000000D # Policy
CSSM_DL_DB.RECORD_GENERIC = 0x0000000E # Generic information
CSSM_DL_DB.RECORD_PUBLIC_KEY = 0x0000000F # Public key
CSSM_DL_DB.RECORD_PRIVATE_KEY = 0x00000010 # Private key
CSSM_DL_DB.RECORD_SYMMETRIC_KEY = 0x00000011 # Symmetric key
CSSM_DL_DB.RECORD_ALL_KEY = 0x00000012 # Temporary table type
# Industry at Large Applications
CSSM_DL_DB.RECORD_GENERIC_PASSWORD = 0x80000000 # User credential
CSSM_DL_DB.RECORD_INTERNET_PASSWORD = 0x80000001 # User credential on the Internet in particular
CSSM_DL_DB.RECORD_APPLESHARE_PASSWORD = 0x80000002 # (Depreciated)
CSSM_DL_DB.RECORD_USER_TRUST = 0x80000003 # User-defined certificates
CSSM_DL_DB.RECORD_X509_CRL = 0x80000004 # X.509 Certificate Revocation List
CSSM_DL_DB.RECORD_UNLOCK_REFERRAL = 0x80000005 # Unlock referral
CSSM_DL_DB.RECORD_EXTENDED_ATTRIBUTE = 0x80000006 # Extended attribute for database management
CSSM_DL_DB.RECORD_X509_CERTIFICATE = 0x80001000 # X.509 Certificates
CSSM_DL_DB.RECORD_METADATA = 0x80008000 # Metadata information
class EMPTY_BLOB(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
def decrypt(self, keychain):
return {}
class COMMON_BLOB(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.Magic = v_uint32(bigend=True)
self.BlobVersion = v_uint32(bigend=True)
def pcb_Magic(self):
if self.Magic != 0xfade0711:
raise ValueError('invalid COMMON_BLOB magic')
class DB_PARAMETERS(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.IdleTimeout = v_uint32(bigend=True) # uint32
self.LockOnSleep = v_uint32(bigend=True) # uint8
class DB_BLOB(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.CommonBlob = COMMON_BLOB()
self.StartCryptoBlob = v_uint32(bigend=True)
self.TotalLength = v_uint32(bigend=True)
self.RandomSignature = v_bytes(size=0x10)
self.Sequence = v_uint32(bigend=True)
self.Params = DB_PARAMETERS()
self.Salt = v_bytes(size=0x14)
self.IV = v_bytes(size=8)
self.BlobSignature = v_bytes(size=0x14)
self.unk2 = vstruct.VArray([v_uint32(bigend=True) for _ in range(7)])
self.EncryptedDBKey = v_bytes(size=0x30)
def decrypt(self, keychain):
# magic: number of rounds = 1000
# magic: key size = 24
master_key = Crypto.Protocol.KDF.PBKDF2(keychain.password, self.Salt, count=1000, dkLen=24)
des3 = DES3.new(master_key, DES3.MODE_CBC, self.IV)
# pkcs#7 padding, 3DES block size (8 bytes)
# magic: size of key = 24 bytes
db_key = unpad(des3.decrypt(self.EncryptedDBKey), 8)[:24]
return {
'master_key': master_key,
'db_key': db_key,
'plaintext': db_key,
}
class SSGP(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.Magic = v_bytes(size=4)
self.Label = v_bytes(size=0x10)
def pcb_Magic(self):
if self.Magic != b'ssgp':
raise ValueError('invalid SSGP header')
def parse_ssgp_label(label):
'''
parse a buffer into an SSGP label and return the id.
'''
ssgp = SSGP()
ssgp.vsParse(label.data)
return ssgp.Label
class SYMMETRIC_KEY_BLOB(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.CommonBlob = COMMON_BLOB()
self.StartCryptoBlob = v_uint32(bigend=True)
self.TotalLength = v_uint32(bigend=True)
self.IV = v_bytes(size=8)
self.Padding = v_bytes(size=0)
self.EncryptedKey = v_bytes(size=0)
def pcb_StartCryptoBlob(self):
self['Padding'].vsSetLength(int(self.StartCryptoBlob) - 0x18)
def pcb_TotalLength(self):
self['EncryptedKey'].vsSetLength(int(self.TotalLength) - int(self.StartCryptoBlob))
def decrypt(self, keychain):
des3a = DES3.new(keychain.db_key, DES3.MODE_CBC, binascii.unhexlify('4adda22c79e82105'))
p1 = unpad(des3a.decrypt(self.EncryptedKey), 8)
des3b = DES3.new(keychain.db_key, DES3.MODE_CBC, self.IV)
# the ciphertext is the first 32 bytes, reversed
p2 = unpad(des3b.decrypt(p1[:0x20][::-1]), 8)
# example plaintext:
#
# 00000000: 00 00 00 00 C1 3D 0F F9 CB AC 6D AC D6 40 3A 98 .....=....m..@:.
# 00000010: 4B 3C 5C F4 E8 12 F0 3E CB 31 83 6C K<\....>.1.l
if len(p2) != 0x1C:
raise ValueError('unexpected plaintext length')
if p2[:4] != b'\x00\x00\x00\x00':
raise ValueError('unexpected plaintext header')
return {
'plaintext': p2[4:],
}
class GENERIC_PASSWORD_BLOB(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.SSGP = SSGP()
self.IV = v_bytes(size=0x8)
self.EncryptedKey = v_greedy_bytes()
def decrypt(self, keychain):
keyid = self.SSGP.Label
key = keychain.get_symmetric_key(keyid)
if self.EncryptedKey:
des3 = DES3.new(key, DES3.MODE_CBC, self.IV)
plaintext = unpad(des3.decrypt(self.EncryptedKey), 8)
return {
'plaintext': plaintext
}
else:
# its possible for there to be no encrypted key,
# e.g. the BlobSize is 0x1C, which only leaves space for:
# SSGP magic
# SSGP label
# IV
# some entries for `Microsoft Office Identities Cache 3` look like this.
# TODO: figure out how to interpret this.
return {}
class INTERNET_PASSWORD_BLOB(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.SSGP = SSGP()
self.IV = v_bytes(size=0x8)
self.EncryptedKey = v_greedy_bytes()
def decrypt(self, keychain):
keyid = self.SSGP.Label
key = keychain.get_symmetric_key(keyid)
des3 = DES3.new(key, DES3.MODE_CBC, self.IV)
plaintext = unpad(des3.decrypt(self.EncryptedKey), 8)
return {
'plaintext': plaintext
}
class PUBLIC_KEY_BLOB(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.CommonBlob = COMMON_BLOB()
self.StartCryptoBlob = v_uint32(bigend=True)
self.TotalLength = v_uint32(bigend=True)
self.Padding = v_bytes(size=0)
self.PublicKey = v_bytes(size=0)
def pcb_StartCryptoBlob(self):
# 0x10 = sizeof(CommonBlob + StartCryptoBlob + TotalLength)
self['Padding'].vsSetLength(int(self.StartCryptoBlob) - 0x10)
def pcb_TotalLength(self):
self['PublicKey'].vsSetLength(int(self.TotalLength) - int(self.StartCryptoBlob))
def decrypt(self, keychain):
return {
'plaintext': self.PublicKey,
}
class PRIVATE_KEY_BLOB(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.CommonBlob = COMMON_BLOB()
self.StartCryptoBlob = v_uint32(bigend=True)
self.TotalLength = v_uint32(bigend=True)
self.IV = v_bytes(size=8)
self.Padding = v_bytes(size=0)
self.EncryptedKey = v_bytes(size=0)
def pcb_StartCryptoBlob(self):
self['Padding'].vsSetLength(int(self.StartCryptoBlob) - 0x18)
def pcb_TotalLength(self):
self['EncryptedKey'].vsSetLength(int(self.TotalLength) - int(self.StartCryptoBlob))
def decrypt(self, keychain):
des3a = DES3.new(keychain.db_key, DES3.MODE_CBC, binascii.unhexlify('4adda22c79e82105'))
p1 = unpad(des3a.decrypt(self.EncryptedKey), 8)
des3b = DES3.new(keychain.db_key, DES3.MODE_CBC, self.IV)
# the ciphertext is the first 32 bytes, reversed
p2 = unpad(des3b.decrypt(p1[::-1]), 8)
return {
'plaintext': p2,
}
class X509_CERTIFICATE_BLOB(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.Certificate = v_greedy_bytes()
def decrypt(self, keychain):
return {
'plaintext': self.Certificate,
}
BLOB_PARSERS = {
# scheam structure is stored in attributes.
CSSM_DL_DB.SCHEMA_INFO: EMPTY_BLOB,
CSSM_DL_DB.SCHEMA_INDEXES: EMPTY_BLOB,
CSSM_DL_DB.SCHEMA_ATTRIBUTES: EMPTY_BLOB,
CSSM_DL_DB.SCHEMA_PARSING_MODULE: EMPTY_BLOB,
CSSM_DL_DB.RECORD_ANY: NotImplemented,
CSSM_DL_DB.RECORD_CERT: NotImplemented,
CSSM_DL_DB.RECORD_CRL: NotImplemented,
CSSM_DL_DB.RECORD_POLICY: NotImplemented,
CSSM_DL_DB.RECORD_GENERIC: NotImplemented,
CSSM_DL_DB.RECORD_PUBLIC_KEY: PUBLIC_KEY_BLOB,
CSSM_DL_DB.RECORD_PRIVATE_KEY: PRIVATE_KEY_BLOB,
CSSM_DL_DB.RECORD_SYMMETRIC_KEY: SYMMETRIC_KEY_BLOB,
CSSM_DL_DB.RECORD_ALL_KEY: NotImplemented,
CSSM_DL_DB.RECORD_GENERIC_PASSWORD: GENERIC_PASSWORD_BLOB,
CSSM_DL_DB.RECORD_INTERNET_PASSWORD: INTERNET_PASSWORD_BLOB,
CSSM_DL_DB.RECORD_APPLESHARE_PASSWORD: NotImplemented,
CSSM_DL_DB.RECORD_USER_TRUST: NotImplemented,
CSSM_DL_DB.RECORD_X509_CRL: NotImplemented,
CSSM_DL_DB.RECORD_UNLOCK_REFERRAL: NotImplemented,
CSSM_DL_DB.RECORD_EXTENDED_ATTRIBUTE: NotImplemented,
CSSM_DL_DB.RECORD_X509_CERTIFICATE: X509_CERTIFICATE_BLOB,
CSSM_DL_DB.RECORD_METADATA: DB_BLOB,
}
CSSM_DB_ATTRIBUTE_FORMAT= v_enum()
CSSM_DB_ATTRIBUTE_FORMAT.STRING = 0
CSSM_DB_ATTRIBUTE_FORMAT.SINT32 = 1
CSSM_DB_ATTRIBUTE_FORMAT.UINT32 = 2
CSSM_DB_ATTRIBUTE_FORMAT.BIG_NUM = 3
CSSM_DB_ATTRIBUTE_FORMAT.REAL = 4
CSSM_DB_ATTRIBUTE_FORMAT.TIME_DATE = 5
CSSM_DB_ATTRIBUTE_FORMAT.BLOB = 6
CSSM_DB_ATTRIBUTE_FORMAT.MULTI_UINT32 = 7
CSSM_DB_ATTRIBUTE_FORMAT.COMPLEX = 8
class STRING(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.length = v_uint32(bigend=True)
self.data = v_str()
def pcb_length(self):
self['data'].vsSetLength(int(self.length))
def __str__(self):
return str(self.data).rstrip('\x00')
def __repr__(self):
return repr(self.data).rstrip('\x00')
SINT32 = lambda: v_int32(bigend=True)
UINT32 = lambda: v_uint32(bigend=True)
BIG_NUM = NotImplemented
REAL = lambda: v_double(bigend=True)
class TIME_DATE(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.data = v_bytes(size=0x10)
self._ts = {}
@property
def ts(self):
return self._ts['it']
def pcb_data(self):
try:
year = int(self.data[0:4])
month = int(self.data[4:6])
day = int(self.data[6:8])
hour = int(self.data[8:10])
min = int(self.data[10:12])
sec = int(self.data[12:14])
except ValueError:
self._ts['it'] = datetime.datetime.min
return
z = self.data[14:16]
if z == b'Z\x00':
# TODO: set tz
self._ts['it'] = datetime.datetime(year, month, day, hour, min, sec, tzinfo=pytz.utc)
else:
self._ts['it'] = datetime.datetime.min
def __repr__(self):
return self.ts.isoformat('T') + 'Z'
def is_ascii(s):
if sys.version_info[0] < 3:
return all(c in string.printable for c in s)
else:
return all(chr(c) in string.printable for c in s)
def is_printable(buf):
try:
s = buf.decode('utf-8').partition('\x00')[0].encode('ascii')
except (UnicodeDecodeError, UnicodeEncodeError):
return False
else:
return is_ascii(s)
class BLOB(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.length = v_uint32(bigend=True)
self.data = v_bytes()
def pcb_length(self):
self['data'].vsSetLength(int(self.length))
def __str__(self):
if is_printable(self.data):
return self.data.decode('utf-8').partition('\x00')[0]
else:
return 'hex:' + binascii.hexlify(self.data).decode('ascii')
def __repr__(self):
return str(self)
MULTI_UINT32 = NotImplemented
COMPLEX = NotImplemented
ATTRIBUTE_PARSERS = {
CSSM_DB_ATTRIBUTE_FORMAT.STRING: STRING,
CSSM_DB_ATTRIBUTE_FORMAT.SINT32: SINT32,
CSSM_DB_ATTRIBUTE_FORMAT.UINT32: UINT32,
CSSM_DB_ATTRIBUTE_FORMAT.BIG_NUM: BIG_NUM,
CSSM_DB_ATTRIBUTE_FORMAT.REAL: REAL,
CSSM_DB_ATTRIBUTE_FORMAT.TIME_DATE: TIME_DATE,
CSSM_DB_ATTRIBUTE_FORMAT.BLOB: BLOB,
CSSM_DB_ATTRIBUTE_FORMAT.MULTI_UINT32: MULTI_UINT32,
CSSM_DB_ATTRIBUTE_FORMAT.COMPLEX: COMPLEX,
}
CSSM_KEYCLASS = v_enum()
CSSM_KEYCLASS.PUBLIC_KEY = 0x00+0x0F
CSSM_KEYCLASS.PRIVATE_KEY = 0x01+0x0F
CSSM_KEYCLASS.SESSION_KEY = 0x02+0x0F
CSSM_KEYCLASS.SECRET_PART = 0x03+0x0F
CSSM_KEYCLASS.OTHER = 0xFFFFFFFF
CSSM_ALGID = v_enum()
CSSM_ALGID.NONE = 0
CSSM_ALGID.CUSTOM = 1
CSSM_ALGID.DH = 2
CSSM_ALGID.PH = 3
CSSM_ALGID.KEA = 4
CSSM_ALGID.MD2 = 5
CSSM_ALGID.MD4 = 6
CSSM_ALGID.MD5 = 7
CSSM_ALGID.SHA1 = 8
CSSM_ALGID.NHASH = 9
CSSM_ALGID.HAVAL = 10
CSSM_ALGID.RIPEMD = 11
CSSM_ALGID.IBCHASH = 12
CSSM_ALGID.RIPEMAC = 13
CSSM_ALGID.DES = 14
CSSM_ALGID.DESX = 15
CSSM_ALGID.RDES = 16
CSSM_ALGID.THREEDES_3KEY_EDE = 17
CSSM_ALGID.THREEDES_2KEY_EDE = 18
CSSM_ALGID.THREEDES_1KEY_EEE = 19
CSSM_ALGID.THREEDES_3KEY_EEE = 20
CSSM_ALGID.THREEDES_2KEY_EEE = 21
CSSM_ALGID.IDEA = 22
CSSM_ALGID.RC2 = 23
CSSM_ALGID.RC5 = 24
CSSM_ALGID.RC4 = 25
CSSM_ALGID.SEAL = 26
CSSM_ALGID.CAST = 27
CSSM_ALGID.BLOWFISH = 28
CSSM_ALGID.SKIPJACK = 29
CSSM_ALGID.LUCIFER = 30
CSSM_ALGID.MADRYGA = 31
CSSM_ALGID.FEAL = 32
CSSM_ALGID.REDOC = 33
CSSM_ALGID.REDOC3 = 34
CSSM_ALGID.LOKI = 35
CSSM_ALGID.KHUFU = 36
CSSM_ALGID.KHAFRE = 37
CSSM_ALGID.MMB = 38
CSSM_ALGID.GOST = 39
CSSM_ALGID.SAFER = 40
CSSM_ALGID.CRAB = 41
CSSM_ALGID.RSA = 42
CSSM_ALGID.DSA = 43
CSSM_ALGID.MD5WithRSA = 44
CSSM_ALGID.MD2WithRSA = 45
CSSM_ALGID.ElGamal = 46
CSSM_ALGID.MD2Random = 47
CSSM_ALGID.MD5Random = 48
CSSM_ALGID.SHARandom = 49
CSSM_ALGID.DESRandom = 50
CSSM_ALGID.SHA1WithRSA = 51
CSSM_ALGID.CDMF = 52
CSSM_ALGID.CAST3 = 53
CSSM_ALGID.CAST5 = 54
CSSM_ALGID.GenericSecret = 55
CSSM_ALGID.ConcatBaseAndKey = 56
CSSM_ALGID.ConcatKeyAndBase = 57
CSSM_ALGID.ConcatBaseAndData = 58
CSSM_ALGID.ConcatDataAndBase = 59
CSSM_ALGID.XORBaseAndData = 60
CSSM_ALGID.ExtractFromKey = 61
CSSM_ALGID.SSL3PreMasterGen = 62
CSSM_ALGID.SSL3MasterDerive = 63
CSSM_ALGID.SSL3KeyAndMacDerive = 64
CSSM_ALGID.SSL3MD5_MAC = 65
CSSM_ALGID.SSL3SHA1_MAC = 66
CSSM_ALGID.PKCS5_PBKDF1_MD5 = 67
CSSM_ALGID.PKCS5_PBKDF1_MD2 = 68
CSSM_ALGID.PKCS5_PBKDF1_SHA1 = 69
CSSM_ALGID.WrapLynks = 70
CSSM_ALGID.WrapSET_OAEP = 71
CSSM_ALGID.BATON = 72
CSSM_ALGID.ECDSA = 73
CSSM_ALGID.MAYFLY = 74
CSSM_ALGID.JUNIPER = 75
CSSM_ALGID.FASTHASH = 76
CSSM_ALGID.THREEDES = 77
CSSM_ALGID.SSL3MD5 = 78
CSSM_ALGID.SSL3SHA1 = 79
CSSM_ALGID.FortezzaTimestamp = 80
CSSM_ALGID.SHA1WithDSA = 81
CSSM_ALGID.SHA1WithECDSA = 82
CSSM_ALGID.DSA_BSAFE = 83
CSSM_ALGID.ECDH = 84
CSSM_ALGID.ECMQV = 85
CSSM_ALGID.PKCS12_SHA1_PBE = 86
CSSM_ALGID.ECNRA = 87
CSSM_ALGID.SHA1WithECNRA = 88
CSSM_ALGID.ECES = 89
CSSM_ALGID.ECAES = 90
CSSM_ALGID.SHA1HMAC = 91
CSSM_ALGID.FIPS186Random = 92
CSSM_ALGID.ECC = 93
CSSM_ALGID.MQV = 94
CSSM_ALGID.NRA = 95
CSSM_ALGID.IntelPlatformRandom = 96
CSSM_ALGID.UTC = 97
CSSM_ALGID.HAVAL3 = 98
CSSM_ALGID.HAVAL4 = 99
CSSM_ALGID.HAVAL5 = 100
CSSM_ALGID.TIGER = 101
CSSM_ALGID.MD5HMAC = 102
CSSM_ALGID.PKCS5_PBKDF2 = 103
CSSM_ALGID.RUNNING_COUNTER = 104
class TABLE_HEADER(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.TableSize = v_uint32(bigend=True)
self.TableId = v_uint32(bigend=True, enum=CSSM_DL_DB)
# number of offset entries with LSB not set (valid offset)
# (contrast to `TotalRowCount`)
self.AllocatedRowCount = v_uint32(bigend=True)
self.Records = v_uint32(bigend=True)
self.IndexesOffset = v_uint32(bigend=True)
self.FreeListHead = v_uint32(bigend=True)
# total number of offset entries
# (contrast to `AllocatedRowCount`)
self.TotalRowCount = v_uint32(bigend=True)
self.RecordOffsets = vstruct.VArray()
def pcb_TotalRowCount(self):
for _ in range(self.TotalRowCount):
self.RecordOffsets.vsAddElement(v_uint32(bigend=True))
class Table:
def __init__(self, db, buf):
'''
Args:
db (Database): the database that owns this table.
buf (bytes): the data to parse for this table.
'''
self.db = db
self.buf = buf
self.header = TABLE_HEADER()
self.header.vsParse(buf)
def get_records(self):
logger.debug('get_records for %s, %d rows total, %d rows allocated',
CSSM_DL_DB.vsReverseMapping(int(self.header.TableId)),
self.header.TotalRowCount,
self.header.AllocatedRowCount)
schema = self.db.get_table_schema(int(self.header.TableId))
for i in range(self.header.TotalRowCount):
record_offset = int(self.header.RecordOffsets[i])
if record_offset & 0b1 > 0:
# if LSB is set, then record is invalid/unallocated
continue
if record_offset == 0x0:
continue
record_length = struct.unpack('>I', self.buf[record_offset:record_offset+4].tobytes())[0]
if record_length == 0x0:
continue
record_buf = self.buf[record_offset:record_offset+record_length]
if isinstance(self.buf, memoryview):
record_buf = self.buf[record_offset:record_offset+record_length].tobytes()
try:
record = Record(schema, record_buf)
except ValueError as e:
logger.warning('failed to parse record (table: %s, record: %s): %s', self.header.TableId, i, e)
continue
yield record
class APPL_DB_SCHEMA(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.SchemaSize = v_uint32(bigend=True)
self.TableCount = v_uint32(bigend=True)
self.TableOffsets = vstruct.VArray()
def pcb_TableCount(self):
for _ in range(self.TableCount):
self.TableOffsets.vsAddElement(v_uint32(bigend=True))
# via: http://mirror.informatimago.com/next/developer.apple.com/documentation/Security/Reference/keychainservices/Reference/reference.html
SecItemAttr = v_enum()
SecItemAttr.CreationDate = struct.unpack('>I', struct.pack('>4s', b'cdat'))[0]
SecItemAttr.ModDate = struct.unpack('>I', struct.pack('>4s', b'mdat'))[0]
SecItemAttr.Description = struct.unpack('>I', struct.pack('>4s', b'desc'))[0]
SecItemAttr.Comment = struct.unpack('>I', struct.pack('>4s', b'icmt'))[0]
SecItemAttr.Creator = struct.unpack('>I', struct.pack('>4s', b'crtr'))[0]
SecItemAttr.Type = struct.unpack('>I', struct.pack('>4s', b'type'))[0]
SecItemAttr.ScriptCode = struct.unpack('>I', struct.pack('>4s', b'scrp'))[0]
SecItemAttr.Label = struct.unpack('>I', struct.pack('>4s', b'labl'))[0]
SecItemAttr.Invisible = struct.unpack('>I', struct.pack('>4s', b'invi'))[0]
SecItemAttr.Negative = struct.unpack('>I', struct.pack('>4s', b'nega'))[0]
SecItemAttr.CustomIcon = struct.unpack('>I', struct.pack('>4s', b'cusi'))[0]
SecItemAttr.Account = struct.unpack('>I', struct.pack('>4s', b'acct'))[0]
SecItemAttr.Service = struct.unpack('>I', struct.pack('>4s', b'svce'))[0]
SecItemAttr.Generic = struct.unpack('>I', struct.pack('>4s', b'gena'))[0]
SecItemAttr.SecurityDomain = struct.unpack('>I', struct.pack('>4s', b'sdmn'))[0]
SecItemAttr.Server = struct.unpack('>I', struct.pack('>4s', b'srvr'))[0]
SecItemAttr.AuthenticationType = struct.unpack('>I', struct.pack('>4s', b'atyp'))[0]
SecItemAttr.Port = struct.unpack('>I', struct.pack('>4s', b'port'))[0]
SecItemAttr.Path = struct.unpack('>I', struct.pack('>4s', b'path'))[0]
SecItemAttr.Volume = struct.unpack('>I', struct.pack('>4s', b'vlme'))[0]
SecItemAttr.Address = struct.unpack('>I', struct.pack('>4s', b'addr'))[0]
SecItemAttr.Signature = struct.unpack('>I', struct.pack('>4s', b'ssig'))[0]
SecItemAttr.Protocol = struct.unpack('>I', struct.pack('>4s', b'ptcl'))[0]
SecItemAttr.CertificateType = struct.unpack('>I', struct.pack('>4s', b'ctyp'))[0]
SecItemAttr.CertificateEncoding = struct.unpack('>I', struct.pack('>4s', b'cenc'))[0]
SecItemAttr.CrlType = struct.unpack('>I', struct.pack('>4s', b'crtp'))[0]
SecItemAttr.CrlEncoding = struct.unpack('>I', struct.pack('>4s', b'crnc'))[0]
SecItemAttr.Alias = struct.unpack('>I', struct.pack('>4s', b'alis'))[0]
CSSM_CERT = v_enum()
CSSM_CERT.UNKNOWN = 0x00
CSSM_CERT.X_509v1 = 0x01
CSSM_CERT.X_509v2 = 0x02
CSSM_CERT.X_509v3 = 0x03
CSSM_CERT.PGP = 0x04
CSSM_CERT.SPKI = 0x05
CSSM_CERT.SDSIv1 = 0x06
CSSM_CERT.Intel = 0x08
CSSM_CERT.X_509_ATTRIBUTE = 0x09
CSSM_CERT.X9_ATTRIBUTE = 0x0A
CSSM_CERT.ACL_ENTRY = 0x0C
CSSM_CERT.MULTIPLE = 0x7FFE
CSSM_CERT.LAST = 0x7FFF
CSSM_CERT.CUSTOM = 0x8000
CSSM_CERT_ENCODING = v_enum()
CSSM_CERT_ENCODING.UNKNOWN = 0x00
CSSM_CERT_ENCODING.CUSTOM = 0x01
CSSM_CERT_ENCODING.BER = 0x02
CSSM_CERT_ENCODING.DER = 0x03
CSSM_CERT_ENCODING.NDR = 0x04
CSSM_CERT_ENCODING.SEXPR = 0x05
CSSM_CERT_ENCODING.PGP = 0x06
CSSM_CERT_ENCODING.MULTIPLE = 0x7FFE
CSSM_CERT_ENCODING.LAST = 0x7FFF
SecAuthenticationType = v_enum()
SecAuthenticationType.NTLM = b'ntlm'
SecAuthenticationType.MSN = b'msna'
SecAuthenticationType.DPA = b'dpaa'
SecAuthenticationType.RPA = b'rpaa'
SecAuthenticationType.HTTPBasic = b'http'
SecAuthenticationType.HTTPDigest = b'httd'
SecAuthenticationType.HTMLForm = b'form'
SecAuthenticationType.Default = b'dflt'
SecAuthenticationType.Any = b'\x00\x00\x00\x00'
SecProtocolType = v_enum()
SecProtocolType.FTP = struct.unpack('>I', struct.pack('>4s', b'ftp '))[0]
SecProtocolType.FTPAccount = struct.unpack('>I', struct.pack('>4s', b'ftpa'))[0]
SecProtocolType.HTTP = struct.unpack('>I', struct.pack('>4s', b'http'))[0]
SecProtocolType.IRC = struct.unpack('>I', struct.pack('>4s', b'irc '))[0]
SecProtocolType.NNTP = struct.unpack('>I', struct.pack('>4s', b'nntp'))[0]
SecProtocolType.POP3 = struct.unpack('>I', struct.pack('>4s', b'pop3'))[0]
SecProtocolType.SMTP = struct.unpack('>I', struct.pack('>4s', b'smtp'))[0]
SecProtocolType.SOCKS = struct.unpack('>I', struct.pack('>4s', b'sox '))[0]
SecProtocolType.IMAP = struct.unpack('>I', struct.pack('>4s', b'imap'))[0]
SecProtocolType.LDAP = struct.unpack('>I', struct.pack('>4s', b'ldap'))[0]
SecProtocolType.AppleTalk = struct.unpack('>I', struct.pack('>4s', b'atlk'))[0]
SecProtocolType.AFP = struct.unpack('>I', struct.pack('>4s', b'afp '))[0]
SecProtocolType.Telnet = struct.unpack('>I', struct.pack('>4s', b'teln'))[0]
SecProtocolType.SSH = struct.unpack('>I', struct.pack('>4s', b'ssh '))[0]
SecProtocolType.FTPS = struct.unpack('>I', struct.pack('>4s', b'ftps'))[0]
SecProtocolType.HTTPS = struct.unpack('>I', struct.pack('>4s', b'htps'))[0]
SecProtocolType.HTTPProxy = struct.unpack('>I', struct.pack('>4s', b'htpx'))[0]
SecProtocolType.HTTPSProxy = struct.unpack('>I', struct.pack('>4s', b'htsx'))[0]
SecProtocolType.FTPProxy = struct.unpack('>I', struct.pack('>4s', b'ftpx'))[0]
SecProtocolType.CIFS = struct.unpack('>I', struct.pack('>4s', b'cifs'))[0]
SecProtocolType.SMB = struct.unpack('>I', struct.pack('>4s', b'smb '))[0]
SecProtocolType.RTSP = struct.unpack('>I', struct.pack('>4s', b'rtsp'))[0]
SecProtocolType.RTSPProxy = struct.unpack('>I', struct.pack('>4s', b'rtsx'))[0]
SecProtocolType.DAAP = struct.unpack('>I', struct.pack('>4s', b'daap'))[0]
SecProtocolType.EPPC = struct.unpack('>I', struct.pack('>4s', b'eppc'))[0]
SecProtocolType.IPP = struct.unpack('>I', struct.pack('>4s', b'ipp '))[0]
SecProtocolType.NNTPS = struct.unpack('>I', struct.pack('>4s', b'ntps'))[0]
SecProtocolType.LDAPS = struct.unpack('>I', struct.pack('>4s', b'ldps'))[0]
SecProtocolType.TelnetS = struct.unpack('>I', struct.pack('>4s', b'tels'))[0]
SecProtocolType.IMAPS = struct.unpack('>I', struct.pack('>4s', b'imps'))[0]
SecProtocolType.IRCS = struct.unpack('>I', struct.pack('>4s', b'ircs'))[0]
SecProtocolType.POP3S = struct.unpack('>I', struct.pack('>4s', b'pops'))[0]
SecProtocolType.CVSpserver = struct.unpack('>I', struct.pack('>4s', b'cvsp'))[0]
SecProtocolType.CVSpserver = struct.unpack('>I', struct.pack('>4s', b'svn '))[0]
SecProtocolType.AdiumMessenger = struct.unpack('>I', struct.pack('>4s', b'AdIM'))[0]
SecProtocolType.Any = struct.unpack('>I', struct.pack('>4s', b'\x00\x00\x00\x00'))[0]
class Database:
def __init__(self, buf):
self.buf = buf
self.schema = APPL_DB_SCHEMA()
self.schema.vsParse(buf)
tables = [
self._get_table_by_index(i)
for i in range(self.schema.TableCount)
]
self.tables = {}
for table in tables:
if table.header.TableId in self.tables:
raise ValueError("dupliate tables with id: " + hex(table.header.TableId))
self.tables[table.header.TableId] = table
def _get_table_by_index(self, index):
table_offset = self.schema.TableOffsets[index]
table_buf = self.buf[table_offset:]
return Table(self, table_buf)
def select(self, table, record_index=None, limit=sys.maxsize):
table = self.tables[table]
for r in itertools.islice(table.get_records(), min(limit, table.header.TotalRowCount)):
yield r
def get_table_schema(self, table):
# the schema is self-describing, so you can inspect the schema itself.
# we provide hardcoded definitions of these tables to bootstrap the schema.
logger.debug('fetching schema for 0x%x', table)
if table == CSSM_DL_DB.SCHEMA_INFO:
return {'RelationID': CSSM_DL_DB.SCHEMA_INFO,
'RelationName': 'CSSM_DL_DB_SCHEMA_INFO',
'attrs': [{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32,
'AttributeID': 0,
'AttributeName': 'RelationID',
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING,
'RelationID': CSSM_DL_DB.SCHEMA_INFO},
{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING,
'AttributeID': 1,
'AttributeName': 'RelationName',
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING,
'RelationID': CSSM_DL_DB.SCHEMA_INFO}]}
elif table == CSSM_DL_DB.SCHEMA_ATTRIBUTES:
return {'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES,
'RelationName': 'CSSM_DL_DB_SCHEMA_ATTRIBUTES',
'attrs': [{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32,
'AttributeID': 0,
'AttributeName': 'RelationID',
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING,
'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES},
{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32,
'AttributeID': 1,
'AttributeName': 'AttributeID',
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING,
'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES},
{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32,
'AttributeID': 2,
'AttributeName': 'AttributeNameFormat',
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING,
'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES},
{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING,
'AttributeID': 3,
'AttributeName': 'AttributeName',
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING,
'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES},
{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.BLOB,
'AttributeID': 4,
'AttributeName': 'AttributeNameID',
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING,
'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES},
{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32,
'AttributeID': 5,
'AttributeName': 'AttributeFormat',
'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING,
'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}]}
for r in self.select(CSSM_DL_DB.SCHEMA_INFO):
if int(r.attrs['RelationID']) != table:
continue
attrs = []
for a in self.select(CSSM_DL_DB.SCHEMA_ATTRIBUTES):
if int(a.attrs['RelationID']) != int(r.attrs['RelationID']):
continue
if 'AttributeName' not in a.attrs:
a.attrs['AttributeName'] = SecItemAttr.vsReverseMapping(int(a.attrs['AttributeID']), default='Unknown')
else:
a.attrs['AttributeName'] = str(a.attrs['AttributeName'])
attrs.append(a.attrs)
ret = copy.copy(r.attrs)
ret['attrs'] = attrs
return ret
raise KeyError('failed to find table')
class APPL_DB_HEADER(vstruct.VStruct):
def __init__(self):
vstruct.VStruct.__init__(self)
self.Signature = v_bytes(size=4)
self.MajorVersion = v_uint16()
self.MinorVersion = v_uint16()
self.HeaderSize = v_uint32(bigend=True)
self.SchemaOffset = v_uint32(bigend=True)
self.AuthOffset = v_uint32(bigend=True)
def pcb_Signature(self):
if self.Signature != b'kych':
raise ValueError('invalid header signature')
def pcb_Version(self):
if self.Version != 0x100:
raise ValueError('unsupported version')
class Keychain:
def __init__(self, buf, password):
self.buf = buf
self.password = password
self.header = APPL_DB_HEADER()
self.header.vsParse(buf)
self.db = Database(buf[self.header.SchemaOffset:])
keys = self.get_master_keys()
self.master_key = keys['master_key']
self.db_key = keys['db_key']
self.symmetric_keys = {
parse_ssgp_label(key['attrs']['Label']): key['plaintext']
for key in self.get_symmetric_keys()
}
def get_decrypted_rows(self, table):
for record in self.db.select(table):
key = {
'attrs': copy.copy(record.attrs),
}
key.update(record.blob.decrypt(self))
yield key
def get_master_keys(self):
# index zero seems to be a magic constant.
return next(self.get_decrypted_rows(CSSM_DL_DB.RECORD_METADATA))
def get_symmetric_keys(self):
for key in self.get_decrypted_rows(CSSM_DL_DB.RECORD_SYMMETRIC_KEY):
yield key
def get_symmetric_key(self, keyid):
return self.symmetric_keys[keyid]
# these are the names of attributes that should be rendered as a boolean (true/false)
# this list is collected empirically, not from any database metadata.
BOOL_ATTRIBUTES = {
'Permanent',
'Private',
'Modifiable',
'Sensitive',
'AlwaysSensitive',
'Extractable',
'NeverExtractable',
'Encrypt',
'Decrypt',
'Derive',
'Sign',
'Verify',
'SignRecover',
'VerifyRecover',
'Wrap',
'Unwrap',
'Invisible',
}
def render_cell(attr_name, attr_value):
if attr_value == '':
return ''
elif attr_name in BOOL_ATTRIBUTES:
if bool(int(attr_value)):
return 'true'
else:
return 'false'
elif attr_name == 'KeyClass':
return CSSM_KEYCLASS.vsReverseMapping(int(attr_value))
elif attr_name == 'KeyType':
return CSSM_ALGID.vsReverseMapping(int(attr_value))
elif attr_name == 'CertType':
return CSSM_CERT.vsReverseMapping(int(attr_value))
elif attr_name == 'CertEncoding':
return CSSM_CERT_ENCODING.vsReverseMapping(int(attr_value))
elif attr_name == 'AuthenticationType':
return SecAuthenticationType.vsReverseMapping(attr_value.data)
elif attr_name == 'Protocol':
return SecProtocolType.vsReverseMapping(int(attr_value))
elif attr_name == 'Port':
return str(attr_value)
elif isinstance(attr_value, v_number):
return hex(attr_value).rstrip('L')
else:
return str(attr_value)
def render_plaintext(outdir, plaintext):
if is_printable(plaintext) and len(plaintext) < 64:
return plaintext.decode('ascii')
else:
md5 = hashlib.md5()
md5.update(plaintext)
outpath = os.path.join(outdir, 'binary', md5.hexdigest())
logger.debug('writing binary blob to file %s', outpath)
with open(outpath, 'wb') as f:
f.write(plaintext)
return 'file://' + os.path.join('binary', md5.hexdigest())
def render_table(keychain, table, outdir):
logger.debug('rendering table %s', CSSM_DL_DB.vsReverseMapping(int(table)))
schema = keychain.db.get_table_schema(table)
rows = []
has_plaintext = any(map(lambda r: 'plaintext' in r, keychain.get_decrypted_rows(table)))
for i, row in enumerate(keychain.get_decrypted_rows(table)):
logger.debug('table %s row %d', CSSM_DL_DB.vsReverseMapping(int(table)), i)
cells = [render_cell(attr['AttributeName'],
row['attrs'].get(attr['AttributeName'], ''))
for attr in schema['attrs']]
if has_plaintext:
cells.append(render_plaintext(outdir, row.get('plaintext', b'')))
rows.append(cells)
headers = [attr['AttributeName'] for attr in schema['attrs']]
if has_plaintext:
headers.append('plaintext')
return tabulate.tabulate(
rows,
headers=headers,
)
def write_keychain_report(keychain, outdir):
binary_dir = os.path.join(outdir, 'binary')
logger.info('writing binary blobs into directory %s', binary_dir)
if not os.path.exists(binary_dir):
os.makedirs(binary_dir)
report_path = os.path.join(outdir, 'report.txt')
logger.info('writing report into file %s', report_path)
with open(report_path, 'wb') as f:
for tableid in sorted(keychain.db.tables.keys()):
try:
f.write(('%s TABLE %s %s\n' % ('#' * 20, CSSM_DL_DB.vsReverseMapping(int(tableid)), '#' * 20)).encode('utf-8'))
f.write(render_table(keychain, tableid, outdir).encode('utf-8'))
f.write('\n'.encode('utf-8'))
except TypeError:
logger.warn('table not supported: %s (submit to Willi for testing)' % (CSSM_DL_DB.vsReverseMapping(int(tableid))))
except Exception as e:
logger.warn('failed to render table %s: %s' % (CSSM_DL_DB.vsReverseMapping(int(tableid)), e))
return None
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="extract keys from macOS keychains")
parser.add_argument("keychain", type=str,
help="Path to input keychain file")
parser.add_argument("password", type=str,
help="Keychain password")
parser.add_argument("output_directory", type=str,
help="Path into which to write binary data")
parser.add_argument("-v", "--verbose", action="store_true",
help="Enable debug logging")
parser.add_argument("-q", "--quiet", action="store_true",
help="Disable all output but errors")
args = parser.parse_args(args=argv)
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
elif args.quiet:
logging.basicConfig(level=logging.ERROR)
logging.getLogger().setLevel(logging.ERROR)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
if not os.path.exists(args.output_directory):
os.makedirs(args.output_directory)
with open(args.keychain, 'rb') as f:
buf = memoryview(f.read())
keychain = Keychain(buf, args.password)
write_keychain_report(keychain, args.output_directory)
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment