Skip to content

Instantly share code, notes, and snippets.

@mildsunrise
Last active February 18, 2024 07:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mildsunrise/585dba677395f1f0a9413c5cbc1c8f2e to your computer and use it in GitHub Desktop.
Save mildsunrise/585dba677395f1f0a9413c5cbc1c8f2e to your computer and use it in GitHub Desktop.
Logic to work with Android KeyMaster blobs and Vold
#!/usr/bin/env python3
'''
keymaster blob logic.
Offers:
- low-level blob encoding and decoding
- loading softkeymaster blobs
- performing cryptographic operations (emulating KeyMaster) on a loaded blob
- CLI tool for parsing softkeymaster blobs and performing operations with them
Still very limited on functionality, primarily for parsing.
(for forensics; not cryptographically secure)
based on:
- system/keymaster @ 44ed723c4b4e
- hardware/libhardware @ aed1b5671f6b
- include/hardware/keymaster_defs.h
- include/hardware/hw_auth_token.h
'''
from collections import defaultdict
from typing import Any, NamedTuple, Optional, BinaryIO, Callable, Tuple, Type, TypeVar, Union, get_args
from binascii import hexlify, unhexlify
from io import BytesIO
from struct import pack, unpack
from enum import Enum, unique
from dataclasses import dataclass
import cryptography.hazmat.primitives.ciphers.algorithms
import cryptography.hazmat.primitives.ciphers.modes
from cryptography.hazmat.primitives import hashes, hmac, ciphers, padding
from datetime import datetime, timezone
import enums
import sys
def trim_docstring(text: str):
if not text: return ''
lines = text.expandtabs().splitlines()
first, *lines = ( line.rstrip() for line in lines )
# dedent all lines except the first
calc_indent = lambda line: len(line) - len(line.lstrip())
indent = min((calc_indent(line) for line in lines if line), default=0)
lines = [ line[indent:] for line in lines ]
# remove blank starting / ending lines
trimmed = [ first.lstrip() ] + lines
while trimmed and not trimmed[-1]: trimmed.pop()
while trimmed and not trimmed[0]: trimmed.pop(0)
return '\n'.join(trimmed)
# BASIC INFRASTRUCTURE
# --------------------
class DecodeError(Exception):
pass
T = TypeVar('T')
def decode(x: bytes, fn: Callable[[BinaryIO], T]) -> T:
st = BytesIO(x)
result = fn(st)
if excess := st.read():
raise DecodeError(f'{len(excess)} excess bytes at end')
return result
def encode(fn: Callable[[BinaryIO], None]) -> bytes:
st = BytesIO()
fn(st)
return st.getvalue()
def read(st: BinaryIO, n: int):
x = bytes()
while n > 0:
chunk = st.read(n)
assert chunk != None
if not chunk:
raise DecodeError('unexpected EOF')
x += chunk; n -= len(chunk)
return x
def read_uint32(st: BinaryIO) -> int:
return unpack('<L', read(st, 4))[0]
def write_uint32(x: int, st: BinaryIO):
st.write(pack('<L', x))
def read_uint64(st: BinaryIO) -> int:
return unpack('<Q', read(st, 8))[0]
def write_uint64(x: int, st: BinaryIO):
st.write(pack('<Q', x))
def read_buffer(st: BinaryIO) -> bytes:
return read(st, read_uint32(st))
def write_buffer(x: bytes, st: BinaryIO):
write_uint32(len(x), st)
st.write(x)
# COMMON DATA STRUCTURES (defined at the interface side)
# ----------------------
@unique
class TagType(Enum):
repeating_base: Optional['TagType']
def __init__(self, _):
self.repeating_base = \
self.__class__[self.name[:-4]] if self.name.endswith('_REP') else None
ENUM = 1
ENUM_REP = 2 # Repeatable enumeration value
UINT = 3
UINT_REP = 4 # Repeatable integer value
ULONG = 5
DATE = 6
BOOL = 7
BIGNUM = 8
BYTES = 9
ULONG_REP = 10 # Repeatable long value
TypeInfo = Tuple[TagType, Optional[Type[Enum]]]
class TagEnforceability(Enum):
# Tags that must be semantically enforced by hardware and software implementations.
ENFORCED = 1
# Tags that should be semantically enforced by hardware if possible and will otherwise be
# enforced by software (keystore).
HARDWARE_ENFORCED = 2
# Semantically unenforceable tags, either because they have no specific meaning or because
# they're informational only.
NONE = 3
@unique
class Tag(Enum):
def __new__(cls, value, *kargs):
obj = object.__new__(cls)
obj._value_ = value
return obj
type: TypeInfo
def __init__(self, _, type: Union[TagType, TypeInfo], docs=None):
self.type = type if isinstance(type, tuple) else (type, None)
self.tag = self.value | (self.type[0].value << 28)
self.enforceability = \
TagEnforceability.ENFORCED if self.value < 400 else \
TagEnforceability.HARDWARE_ENFORCED if self.value < 700 else \
TagEnforceability.NONE
self.__doc__ = trim_docstring(docs)
@classmethod
def read(cls, st: BinaryIO):
tag = read_uint32(st)
try:
self = cls(tag & ~(~0 << 28))
except ValueError as exc:
raise DecodeError() from exc
if self.tag != tag:
raise DecodeError('invalid tag type')
return self
def write(self, st: BinaryIO):
st.write(self.tag)
# Tags that must be semantically enforced by hardware and software implementations.
## Crypto parameters
PURPOSE = 1, (TagType.ENUM_REP, enums.Purpose)
ALGORITHM = 2, (TagType.ENUM, enums.Algorithm)
KEY_SIZE = 3, TagType.UINT, '''Key size in bits.'''
BLOCK_MODE = 4, (TagType.ENUM_REP, enums.BlockMode)
DIGEST = 5, (TagType.ENUM_REP, enums.Digest)
PADDING = 6, (TagType.ENUM_REP, enums.Padding)
CALLER_NONCE = 7, TagType.BOOL, '''Allow caller to specify nonce or IV.'''
MIN_MAC_LENGTH = 8, TagType.UINT, '''Minimum length of MAC or AEAD authentication tag in bits.'''
KDF = 9, (TagType.ENUM_REP, enums.Kdf), '''(keymaster2)'''
EC_CURVE = 10, (TagType.ENUM, enums.EcCurve), '''(keymaster2)'''
## Algorithm-specific.
RSA_PUBLIC_EXPONENT = 200, TagType.ULONG
ECIES_SINGLE_HASH_MODE = 201, TagType.BOOL, '''Whether the ephemeral public key is fed into the KDF'''
INCLUDE_UNIQUE_ID = 202, TagType.BOOL, '''If true, attestation certificates for this key will contain an application-scoped and time-bounded device-unique ID. (keymaster2)'''
RSA_OAEP_MGF_DIGEST = 203, (TagType.ENUM_REP, enums.Digest)
# Other hardware-enforced.
BLOB_USAGE_REQUIREMENTS = 301, (TagType.ENUM, enums.KeyBlobUsageRequirements)
BOOTLOADER_ONLY = 302, TagType.BOOL, '''Usable only by bootloader'''
ROLLBACK_RESISTANCE = 303, TagType.BOOL, '''Hardware enforced deletion with deleteKey or deleteAllKeys is supported'''
EARLY_BOOT_ONLY = 305, TagType.BOOL, '''Key can only be used during early boot.'''
# Tags that should be semantically enforced by hardware if possible and will otherwise be
# enforced by software (keystore).
## Key validity period
ACTIVE_DATETIME = 400, TagType.DATE, '''Start of validity'''
ORIGINATION_EXPIRE_DATETIME = 401, TagType.DATE, '''Date when new "messages" should no longer be created.'''
USAGE_EXPIRE_DATETIME = 402, TagType.DATE, '''Date when existing "messages" should no longer be trusted.'''
MIN_SECONDS_BETWEEN_OPS = 403, TagType.UINT, '''Minimum elapsed time between cryptographic operations with the key.'''
MAX_USES_PER_BOOT = 404, TagType.UINT, '''Number of times the key can be used per boot.'''
USAGE_COUNT_LIMIT = 405, TagType.UINT, '''Number of cryptographic operations left with the key'''
## User authentication
ALL_USERS = 500, TagType.BOOL, '''Reserved for future use -- ignore'''
USER_ID = 501, TagType.UINT, '''Reserved for future use -- ignore'''
USER_SECURE_ID = 502, TagType.ULONG_REP, \
'''Secure ID of authorized user or authenticator(s).
Disallowed if ALL_USERS or NO_AUTH_REQUIRED is present.'''
NO_AUTH_REQUIRED = 503, TagType.BOOL, '''If key is usable without authentication.'''
USER_AUTH_TYPE = 504, (TagType.ENUM, enums.AuthenticatorType), \
'''Bitmask of authenticator types allowed when USER_SECURE_ID contains
a secure user ID, rather than a secure authenticator ID.'''
AUTH_TIMEOUT = 505, TagType.UINT, \
'''Required freshness of user authentication for private/secret key
operations, in seconds. Public key operations require no authentication.
If absent, authentication is required for every use.
Authentication state is lost when the device is powered off.'''
ALLOW_WHILE_ON_BODY = 506, TagType.BOOL, \
'''Allow key to be used after authentication timeout
if device is still on-body (requires secure on-body sensor).'''
TRUSTED_USER_PRESENCE_REQUIRED = 507, TagType.BOOL, '''Require test of user presence to use this key.'''
TRUSTED_CONFIRMATION_REQUIRED = 508, TagType.BOOL, '''Require user confirmation through a trusted UI to use this key.'''
UNLOCKED_DEVICE_REQUIRED = 509, TagType.BOOL, '''Require the device screen to be unlocked if the key is used.'''
## Application access control
ALL_APPLICATIONS = 600, TagType.BOOL, '''Specified to indicate key is usable by all applications.'''
APPLICATION_ID = 601, TagType.BYTES, '''Byte string identifying the authorized application.'''
EXPORTABLE = 602, TagType.BOOL, \
'''If true, private/secret key can be exported, but only
if all access control requirements for use are met. (keymaster2)'''
# Semantically unenforceable tags, either because they have no specific meaning or because
# they're informational only.
APPLICATION_DATA = 700, TagType.BYTES, '''Data provided by authorized application.'''
CREATION_DATETIME = 701, TagType.DATE, '''Key creation time'''
ORIGIN = 702, (TagType.ENUM, enums.KeyOrigin)
ROLLBACK_RESISTANT = 703, TagType.BOOL, '''Whether key is rollback-resistant.'''
ROOT_OF_TRUST = 704, TagType.BYTES, '''Root of trust ID.'''
OS_VERSION = 705, TagType.UINT, '''Version of system (keymaster2)'''
OS_PATCHLEVEL = 706, TagType.UINT, '''Patch level of system (keymaster2)'''
UNIQUE_ID = 707, TagType.BYTES, '''Used to provide unique ID in attestation'''
ATTESTATION_CHALLENGE = 708, TagType.BYTES, '''Used to provide challenge in attestation'''
ATTESTATION_APPLICATION_ID = 709, TagType.BYTES, \
'''Used to identify the set of possible applications of which
one has initiated a key attestation'''
ATTESTATION_ID_BRAND = 710, TagType.BYTES, '''Used to provide the device's brand name to be included in attestation'''
ATTESTATION_ID_DEVICE = 711, TagType.BYTES, '''Used to provide the device's device name to be included in attestation'''
ATTESTATION_ID_PRODUCT = 712, TagType.BYTES, '''Used to provide the device's product name to be included in attestation'''
ATTESTATION_ID_SERIAL = 713, TagType.BYTES, '''Used to provide the device's serial number to be included in attestation'''
ATTESTATION_ID_IMEI = 714, TagType.BYTES, '''Used to provide the device's IMEI to be included in attestation'''
ATTESTATION_ID_MEID = 715, TagType.BYTES, '''Used to provide the device's MEID to be included in attestation'''
ATTESTATION_ID_MANUFACTURER = 716, TagType.BYTES, '''Used to provide the device's manufacturer name to be included in attestation'''
ATTESTATION_ID_MODEL = 717, TagType.BYTES, '''Used to provide the device's model name to be included in attestation'''
VENDOR_PATCHLEVEL = 718, TagType.UINT, '''specifies the vendor image security patch level with which the key may be used'''
BOOT_PATCHLEVEL = 719, TagType.UINT, '''specifies the boot image (kernel) security patch level with which the key may be used'''
DEVICE_UNIQUE_ATTESTATION = 720, TagType.BOOL, '''Indicates StrongBox device-unique attestation is requested.'''
IDENTITY_CREDENTIAL_KEY = 721, TagType.BOOL, '''This is an identity credential key'''
STORAGE_KEY = 722, TagType.BOOL, '''storage encryption key'''
## Tags used only to provide data to or receive data from operations
ASSOCIATED_DATA = 1000, TagType.BYTES, '''Used to provide associated data for AEAD modes.'''
NONCE = 1001, TagType.BYTES, '''Nonce or Initialization Vector'''
AUTH_TOKEN = 1002, TagType.BYTES, \
'''Authentication token that proves secure user authentication has been performed.
Structure defined in hw_auth_token_t in hw_auth_token.h.'''
MAC_LENGTH = 1003, TagType.UINT, '''MAC or AEAD authentication tag length in bits.'''
RESET_SINCE_ID_ROTATION = 1004, TagType.BOOL, \
'''Whether the device has beeen factory reset since the last unique ID rotation.
Used for key attestation.'''
CONFIRMATION_TOKEN = 1005, TagType.BYTES, '''used to deliver a cryptographic token proving that the user confirmed a signing request.'''
CERTIFICATE_SERIAL = 1006, TagType.BIGNUM, '''The serial number that should be set in the attestation certificate to be generated.'''
CERTIFICATE_SUBJECT = 1007, TagType.BYTES, '''A DER-encoded X.500 subject that should be set in the attestation certificate to be generated.'''
CERTIFICATE_NOT_BEFORE = 1008, TagType.DATE, \
'''Epoch time in milliseconds of the start of the to be generated certificate's validity.
The value should interpreted as too's complement signed integer.
Negative values indicate dates before Jan 1970'''
CERTIFICATE_NOT_AFTER = 1009, TagType.DATE, \
'''Epoch time in milliseconds of the end of the to be generated certificate's validity.
The value should interpreted as too's complement signed integer.
Negative values indicate dates before Jan 1970'''
MAX_BOOT_LEVEL = 1010, TagType.UINT, '''Specifies a maximum boot level at which a key should function.'''
class Param(NamedTuple):
key: Tag
value: Any
def sort_key(self):
'''sort key, according to keymaster_param_compare'''
value = self.value
# for enums, compare their values
value = value.value if isinstance(value, Enum) else value
if isinstance(value, list):
value = [x.x if isinstance(x, Enum) else x for x in value]
# FIXME: handle boolean, buffer / bignum
# use the full tag number (including type)
return self.key.tag, value
def __ge__(self, other):
return self.sort_key() >= other.sort_key() if self.__class__ is other.__class__ else None
def __gt__(self, other):
return self.sort_key() > other.sort_key() if self.__class__ is other.__class__ else None
def __le__(self, other):
return self.sort_key() <= other.sort_key() if self.__class__ is other.__class__ else None
def __lt__(self, other):
return self.sort_key() < other.sort_key() if self.__class__ is other.__class__ else None
@staticmethod
def read_value(st: BinaryIO, indirect_st: BinaryIO, tinfo: TypeInfo) -> Any:
tag_type, enum = tinfo
tag_type = tag_type.repeating_base or tag_type
if tag_type == TagType.BOOL:
x = read(st, 1)[0]
if x > 1:
raise DecodeError(f'invalid boolean {x}')
return bool(x)
if tag_type == TagType.ENUM:
x = read_uint32(st)
if enum:
try:
x = enum(x)
except ValueError as exc:
raise DecodeError('error matching enum field value') from exc
return x
if tag_type == TagType.UINT:
return read_uint32(st)
if tag_type == TagType.ULONG:
return read_uint64(st)
if tag_type == TagType.DATE:
x = read_uint64(st) # FIXME: negative?
xp = x / 1000
#assert int(xp * 1000) == x FIXME
return datetime.fromtimestamp(xp, timezone.utc)
if tag_type == TagType.BYTES or tag_type == TagType.BIGNUM:
size = read_uint32(st)
offset = read_uint32(st)
if offset != indirect_st.tell():
raise DecodeError(f'tag points to ${offset} but we are at {indirect_st.tell()}')
x = read(indirect_st, size)
# FIXME: for BIGNUM, convert to int
return x
raise AssertionError('should not happen')
@staticmethod
def write_value(value: Any, st: BinaryIO, indirect_st: BinaryIO, tinfo: TypeInfo) -> Any:
tag_type, enum = tinfo
tag_type = tag_type.repeating_base or tag_type
if tag_type == TagType.BOOL:
assert isinstance(value, bool)
st.write(bytes([ int(value) ]))
return
if tag_type == TagType.ENUM:
assert isinstance(value, enum) if enum else isinstance(value, int)
write_uint32(value.value if enum else value, st)
return
if tag_type == TagType.UINT:
assert isinstance(value, int)
write_uint32(value, st)
return
if tag_type == TagType.ULONG:
assert isinstance(value, int)
write_uint64(value, st)
return
if tag_type == TagType.DATE:
assert isinstance(value, datetime) and datetime.tzinfo == timezone.utc
x = int(value.timestamp() * 1000)
#assert x / 1000 == value.timestamp() FIXME
write_uint64(x, st)
return
if tag_type == TagType.BYTES or tag_type == TagType.BIGNUM:
# FIXME: for BIGNUM, convert from int
assert isinstance(value, bytes)
write_uint32(len(value), st)
write_uint32(indirect_st.tell(), st)
return
raise AssertionError('should not happen')
@classmethod
def read(cls, st: BinaryIO, indirect_st: BinaryIO):
tag = Tag.read(st)
value = Param.read_value(st, indirect_st, tag.type)
return cls(tag, value)
def write(self, st: BinaryIO, indirect_st: BinaryIO):
self.tag.write(st)
Param.write_value(self.value, st, indirect_st, self.tag.type)
class AuthorizationSet(list[Param]):
@classmethod
def read(cls, st: BinaryIO):
indirect_data = read_buffer(st)
elements_count = read_uint32(st)
elements_data = read_buffer(st)
items = decode(indirect_data, lambda indirect_st:
decode(elements_data, lambda st:
[Param.read(st, indirect_st) for _ in range(elements_count)]))
return cls(items)
def write(self, st: BinaryIO):
indirect_data = BytesIO()
elements_data = BytesIO()
for item in self:
item.write(elements_data, indirect_data)
write_buffer(indirect_data.getvalue())
write_uint32(len(self))
write_buffer(elements_data.getvalue())
# KEYMASTER BLOBS
# ---------------
@dataclass
class AuthEncryptedBlob:
@unique
class Format(Enum):
AES_OCB = 0
AES_GCM_WITH_SW_ENFORCED = 1
AES_GCM_WITH_SECURE_DELETION = 2
@classmethod
def read(cls, st: BinaryIO):
try:
return cls(read(st, 1)[0])
except ValueError as exc:
raise DecodeError() from exc
def write(self, st: BinaryIO):
st.write(bytes([ self.value ]))
@dataclass
class EncryptedKey:
format: 'AuthEncryptedBlob.Format'
nonce: bytes
ciphertext: bytes
tag: bytes
def verify(self) -> bool:
return len(self.nonce) == 12 and len(self.tag) == 16
@classmethod
def read(cls, st: BinaryIO):
return cls(
format = AuthEncryptedBlob.Format.read(st),
nonce = read_buffer(st),
ciphertext = read_buffer(st),
tag = read_buffer(st),
)
def write(self, st: BinaryIO):
self.format.write(st)
write_buffer(self.nonce, st)
write_buffer(self.ciphertext, st)
write_buffer(self.tag, st)
# FIXME: decrypt_key(), encrypt_key()
key: EncryptedKey
hw_enforced: AuthorizationSet
sw_enforced: AuthorizationSet
# uint32, only present for AES_GCM_WITH_SECURE_DELETION
key_slot: Optional[int]
def verify(self) -> bool:
return self.key.verify() and \
(self.key_slot != None) == (self.key.format == AuthEncryptedBlob.EncryptedKey.Format.AES_GCM_WITH_SECURE_DELETION)
@classmethod
def read(cls, st: BinaryIO):
self = cls(
key = AuthEncryptedBlob.EncryptedKey.read(st),
hw_enforced = AuthorizationSet.read(st),
sw_enforced = AuthorizationSet.read(st),
)
if self.key.format == AuthEncryptedBlob.EncryptedKey.Format.AES_GCM_WITH_SECURE_DELETION:
self.key_slot = read_uint32(st)
return self
def write(self, st: BinaryIO):
self.key.write(st)
self.hw_enforced.write(st)
self.sw_enforced.write(st)
if self.key.format == AuthEncryptedBlob.EncryptedKey.Format.AES_GCM_WITH_SECURE_DELETION:
write_uint32(self.key_slot, st)
@dataclass
class IntegrityAssuredBlob:
@dataclass
class Inner:
BLOB_VERSION = 0
key: bytes
hw_enforced: AuthorizationSet
sw_enforced: AuthorizationSet
def verify(self) -> bool:
return True
@classmethod
def read(cls, st: BinaryIO):
version = read(st, 1)[0]
if version != IntegrityAssuredBlob.Inner.BLOB_VERSION:
raise DecodeError(f'invalid version {version}')
return cls(
key = read_buffer(st),
hw_enforced = AuthorizationSet.read(st),
sw_enforced = AuthorizationSet.read(st),
)
def write(self, st: BinaryIO):
st.write(bytes([ IntegrityAssuredBlob.Inner.BLOB_VERSION ]))
write_buffer(self.key, st)
self.hw_enforced.write(st)
self.sw_enforced.write(st)
HMAC_SIZE = 8
HMAC_KEY = b'IntegrityAssuredBlob0'
inner: Inner
hmac: Optional[bytes]
def verify(self) -> bool:
return self.hmac and len(self.hmac) == IntegrityAssuredBlob.HMAC_SIZE
def compute_hmac(self, hidden: AuthorizationSet) -> bytes:
h = hmac.HMAC(IntegrityAssuredBlob.HMAC_KEY, hashes.SHA256)
h.update(encode(self.inner.write))
h.update(encode(hidden.write))
return h.finalize()[:IntegrityAssuredBlob.HMAC_SIZE]
def verify_hmac(self, hidden: AuthorizationSet) -> bytes:
if self.hmac != (exp_hmac := self.compute_hmac(hidden)):
raise ValueError(f'invalid HMAC (expected {hexlify(exp_hmac)}, got {hexlify(self.hmac)})')
@classmethod
def read(cls, st: BinaryIO):
buf = st.read()
if (ndata := len(buf) - IntegrityAssuredBlob.HMAC_SIZE) < 0:
raise DecodeError('unexpected EOF')
return cls(
inner = decode(buf[:ndata], IntegrityAssuredBlob.Inner.read),
hmac = buf[ndata:],
)
def write(self, st: BinaryIO):
st.write(encode(self.inner.write))
st.write(self.hmac)
@dataclass
class OldSoftkeymasterBlob:
MAGIC = b'PK#8'
# uint32, ignored
type: int
# ignored
publicKey: bytes
# PKCS#8 private key
privateKey: bytes
def verify(self) -> bool:
return True
# careful, this uses big endian
@classmethod
def read(cls, st: BinaryIO):
if read(st, len(OldSoftkeymasterBlob.MAGIC)) != OldSoftkeymasterBlob.MAGIC:
raise DecodeError(f'magic not matching')
return cls(
type = unpack('>I', read(st, 4))[0],
publicKey = read(st, unpack('>I', read(st, 4))[0]),
privateKey = read(st, unpack('>I', read(st, 4))[0]),
)
def write(self, st: BinaryIO):
st.write(OldSoftkeymasterBlob.MAGIC)
st.write(pack('>I', self.type))
st.write(pack('>I', len(self.publicKey)))
st.write(self.publicKey)
st.write(pack('>I', len(self.privateKey)))
st.write(self.privateKey)
# CRYPTOGRAPHIC OPERATIONS
# ------------------------
@dataclass
class LoadedBlob:
'''A loaded and normalized blob, on which operations can be performed.'''
key: bytes
'''The material of the wrapped key'''
hw_enforced: AuthorizationSet
'''Hardware-enforced tags'''
sw_enforced: AuthorizationSet
'''Software-enforced tags'''
def get_merged_tags(self) -> Tuple[ dict[Tag, list[Any]], Callable[[Tag, Optional[bool]], Optional[Any]] ]:
tags = defaultdict(lambda: [])
for tag, value in self.hw_enforced + self.sw_enforced:
if value not in tags[tag]:
tags[tag].append(value)
def get_tag(tag: Tag, optional: Optional[bool]=False) -> Optional[Any]:
ts = tags[tag]
if len(ts) == 0 and (not optional):
raise Exception(f'Tag {tag.name} not present in blob')
if len(ts) > 1:
print(f'WARNING: Tag {tag.name} has multiple values: {", ".join(map(str, ts))}\n' +
' Picking first one.', file=sys.stderr)
return next(iter(ts)) if ts else None
return dict(tags), get_tag
def initial_checks(self, purpose: enums.Purpose):
'''Performs initial checks common to all operations.'''
tags, get_tag = self.get_merged_tags()
if tags[Tag.KEY_SIZE] and len(self.key) * 8 not in tags[Tag.KEY_SIZE]:
print('WARNING: Key size does not match tags', file=sys.stderr)
if purpose not in tags[Tag.PURPOSE]:
print(f'WARNING: Key is not for {purpose.name} use', file=sys.stderr)
def create_symmetric_cypher(self, iv: bytes) -> Tuple[ciphers.Cipher, Optional[padding.PKCS7]]:
tags, get_tag = self.get_merged_tags()
algs = cryptography.hazmat.primitives.ciphers.algorithms
alg = {
enums.Algorithm.AES: algs.AES,
enums.Algorithm.TRIPLE_DES: algs.TripleDES,
}[get_tag(Tag.ALGORITHM)](self.key)
modes = cryptography.hazmat.primitives.ciphers.modes
mode = {
enums.BlockMode.ECB: modes.ECB,
enums.BlockMode.CBC: modes.CBC,
enums.BlockMode.CTR: modes.CTR,
enums.BlockMode.GCM: modes.GCM,
}[get_tag(Tag.BLOCK_MODE)]
min_tag_length = get_tag(Tag.MIN_MAC_LENGTH)
mode = mode(iv,
**(dict(min_tag_length=min_tag_length//8) if min_tag_length != None else {}),
)
padder = {
enums.Padding.NONE: None,
enums.Padding.RSA_OAEP: False,
enums.Padding.RSA_PSS: False,
enums.Padding.RSA_PKCS1_1_5_ENCRYPT: False,
enums.Padding.RSA_PKCS1_1_5_SIGN: False,
enums.Padding.PKCS7: padding.PKCS7,
}[get_tag(Tag.PADDING)]
if padder is False:
raise Exception('padding is not valid')
padder = padder(alg.block_size) if padder else None
return ciphers.Cipher(alg, mode), padder
def decrypt(self, input: bytes, iv: bytes, tag: Optional[bytes] = None) -> bytes:
'''Decrypts a message with the key blob.
`tag` must be the authentication tag for AEAD ciphers or None otherwise'''
tags, get_tag = self.get_merged_tags()
self.initial_checks(enums.Purpose.DECRYPT)
alg = get_tag(Tag.ALGORITHM)
if alg in { enums.Algorithm.RSA, enums.Algorithm.EC }:
# asymmetric crypto
raise NotImplementedError('asymmetric crypto not implemented yet')
curve = get_tag(Tag.EC_CURVE)
if alg in { enums.Algorithm.AES, enums.Algorithm.TRIPLE_DES }:
# symmetric crypto
cipher, padder = self.create_symmetric_cypher(iv)
has_tag = isinstance(cipher.mode, cryptography.hazmat.primitives.ciphers.modes.ModeWithAuthenticationTag)
if has_tag != (tag != None):
raise Exception(f'the chosen mode {"uses" if has_tag else "does not use"} an authentication tag')
cipher = cipher.decryptor()
input = cipher.update(input) + \
(cipher.finalize_with_tag(tag) if has_tag else cipher.finalize())
if padder:
padder = padder.unpadder()
input = padder.update(input) + padder.finalize()
return input
if alg == enums.Algorithm.HMAC:
raise Exception('HMAC keys cannot be used for decryption')
raise AssertionError('unreachable')
def encrypt(self, input: bytes, iv: bytes) -> Tuple[bytes, Optional[bytes]]:
'''Encrypts a message with the key blob.
Returns (ciphertext, tag) where tag is the authentication tag for AEAD ciphers,
or None otherwise.'''
tags, get_tag = self.get_merged_tags()
self.initial_checks(enums.Purpose.DECRYPT)
alg = get_tag(Tag.ALGORITHM)
if alg in { enums.Algorithm.RSA, enums.Algorithm.EC }:
# asymmetric crypto
raise NotImplementedError('asymmetric crypto not implemented yet')
curve = get_tag(Tag.EC_CURVE)
if alg in { enums.Algorithm.AES, enums.Algorithm.TRIPLE_DES }:
# symmetric crypto
cipher, padder = self.create_symmetric_cypher(iv)
has_tag = isinstance(cipher.mode, cryptography.hazmat.primitives.ciphers.modes.ModeWithAuthenticationTag)
if padder:
padder = padder.padder()
input = padder.update(input) + padder.finalize()
cipher = cipher.encryptor()
input = cipher.update(input) + cipher.finalize(input)
return input, (cipher.tag if has_tag else None)
if alg == enums.Algorithm.HMAC:
raise Exception('HMAC keys cannot be used for encryption')
raise AssertionError('unreachable')
def sign(self, input: bytes) -> bytes:
'''Signs a message using the key and returns the signature.'''
tags, get_tag = self.get_merged_tags()
self.initial_checks(enums.Purpose.DECRYPT)
alg = get_tag(Tag.ALGORITHM)
if alg in { enums.Algorithm.RSA, enums.Algorithm.EC }:
raise NotImplementedError('asymmetric crypto not implemented yet')
if alg in { enums.Algorithm.AES, enums.Algorithm.TRIPLE_DES }:
raise Exception('symmetric crypto cannot be used for signing')
if alg == enums.Algorithm.HMAC:
digest = {
enums.Digest.NONE: None, # FIXME
enums.Digest.MD5: hashes.MD5,
enums.Digest.SHA1: hashes.SHA1,
enums.Digest.SHA_2_224: hashes.SHA224,
enums.Digest.SHA_2_256: hashes.SHA256,
enums.Digest.SHA_2_384: hashes.SHA384,
enums.Digest.SHA_2_512: hashes.SHA512,
}[get_tag(Tag.DIGEST)]()
h = hmac.HMAC(self.key, digest)
h.update(input)
return h.finalize()
raise AssertionError('unreachable')
# SOFTKEYMASTER LOGIC
# -------------------
# softkeymaster tries to decode blobs in this order:
SotfkeymasterBlob = Union[
IntegrityAssuredBlob,
AuthEncryptedBlob,
OldSoftkeymasterBlob,
]
def decode_softkeymaster_blob(x: bytes) -> SotfkeymasterBlob:
for handler in get_args(SotfkeymasterBlob):
try:
return decode(x, handler.read)
except DecodeError:
pass
raise DecodeError('invalid blob')
def load_software_blob(blob: SotfkeymasterBlob) -> LoadedBlob:
if isinstance(blob, IntegrityAssuredBlob):
return LoadedBlob(blob.inner.key, blob.inner.hw_enforced, blob.inner.sw_enforced)
if isinstance(blob, AuthEncryptedBlob):
raise NotImplementedError('obtaining key material from AuthEncryptedBlob not implemented yet')
return LoadedBlob(key, blob.hw_enforced, blob.sw_enforced)
if isinstance(blob, OldSoftkeymasterBlob):
raise NotImplementedError('deducing tags from OldSoftkeymasterBlob not implemented yet')
inferred_tags = AuthorizationSet()
return LoadedBlob(blob.privateKey, AuthorizationSet(), inferred_tags)
raise AssertionError('should not happen')
# I'm too lazy to look up where exactly this header comes from,
# but it is probably tucked by the keystore interface to signal
# that the blob is a keymaster blob. what was keystore again?
KEYSTORE_MAGIC = b'pKMblob\0'
def print_blob(blob: SotfkeymasterBlob):
print()
print('BLOB TYPE:', {
IntegrityAssuredBlob: '\x1b[1mIntegrityAssuredBlob\x1b[m (new keymaster1 software key blob, or new keymaster0-backed blob)',
AuthEncryptedBlob: '\x1b[1mAuthEncryptedBlob\x1b[m (old keymaster1 software key blobs)',
OldSoftkeymasterBlob: '\x1b[1mOldSoftkeymasterBlob\x1b[m (old keymaster0 software key blobs)',
}[type(blob)])
print()
def print_auth_set(x: AuthorizationSet):
fmt_os_version = lambda x: f'{x} (Android {x//10000}.{(x//10) % 10}{f".{x % 10}" if x % 10 else ""})'
fmt_value = lambda tag, value: \
value.isoformat(' ') if isinstance(value, datetime) else \
fmt_os_version(value) if tag == Tag.OS_VERSION else \
str(value)
print(' [Empty]' if not x else '\n'.join(
f' - \x1b[32m{key.name}\x1b[m ({key.value}, {key.type[0].name}) = \x1b[33m{fmt_value(key, value)}\x1b[m' for key, value in x))
if isinstance(blob, IntegrityAssuredBlob):
inner = blob.inner
print(f'Key ({len(inner.key)} bytes): \x1b[1m{hexlify(inner.key).decode()}\x1b[m')
print()
print('Hardware-enforced tags:')
print_auth_set(inner.hw_enforced)
print()
print('Software-enforced tags:')
print_auth_set(inner.sw_enforced)
print()
print('HMAC:', hexlify(blob.hmac).decode())
# FIXME: HMAC checking not fully implemented yet
if isinstance(blob, AuthEncryptedBlob):
key = blob.key
print('Encrypted key:')
print(f' Ciphertext: \x1b[1m{hexlify(key.ciphertext).decode()}\x1b[m')
print(f' Nonce / IV: \x1b[1m{hexlify(key.nonce).decode()}\x1b[m')
print(f' Tag: \x1b[1m{hexlify(key.tag).decode()}\x1b[m')
print('(FIXME: the key is encrypted with an all-zeros\n master key, but decryption is not implemented yet)')
print()
print('Hardware-enforced tags:')
print_auth_set(blob.hw_enforced)
print()
print('Software-enforced tags:')
print_auth_set(blob.sw_enforced)
if blob.key_slot != None:
print(f'\nKey slot: {blob.key_slot}')
if isinstance(blob, OldSoftkeymasterBlob):
print(f'Type: {blob.type}')
print()
print(f'Public key: {hexlify(blob.publicKey).decode()}')
print()
print(f'Key ({len(blob.privateKey)} bytes): \x1b[1m{hexlify(blob.privateKey).decode()}\x1b[m')
print()
if __name__ == '__main__':
import argparse
blob_types = {
'IntegrityAssured': IntegrityAssuredBlob,
'AuthEncrypted': AuthEncryptedBlob,
'OldSoftkeymaster': OldSoftkeymasterBlob,
}
parser = argparse.ArgumentParser(
usage='blobs.py [-h] [options] BLOB_FILE',
description='Parses key blobs from software KeyMaster implementations.')
parser.add_argument('blob_file', metavar='BLOB_FILE',
help='file containing the blob to parse')
parser.add_argument('--strict', metavar='INPUT_FILE', action=argparse.BooleanOptionalAction,
help="don't try to remove non-keymaster wrapping such as the keystore header")
parser.add_argument('-t', '--type', metavar='BLOB_TYPE', choices=blob_types,
help=f'force a specific blob type to be parsed ({", ".join(blob_types)})')
subparsers = parser.add_subparsers(dest='operation', help='instead of inspecting the blob, use it to perform an operation')
parser_sign = subparsers.add_parser('sign', help='sign INPUT_FILE and output the signature to stdout')
parser_sign.add_argument('input_file', metavar='INPUT_FILE')
parser_decrypt = subparsers.add_parser('decrypt', help='decrypt INPUT_FILE and output the plaintext to stdout')
parser_decrypt.add_argument('input_file', metavar='INPUT_FILE')
parser_decrypt.add_argument('iv', metavar='IV', help='decryption IV, in hex')
parser_decrypt.add_argument('auth_tag', metavar='AUTH_TAG', help='authentication tag, in hex', required=False)
args = parser.parse_args()
with open(args.blob_file, 'rb') as f:
raw_blob = f.read()
if (not args.strict) and raw_blob.startswith(KEYSTORE_MAGIC):
raw_blob = raw_blob[len(KEYSTORE_MAGIC):]
if args.type:
blob = decode(raw_blob, blob_types[args.type].read)
else:
blob = decode_softkeymaster_blob(raw_blob)
if args.operation:
loaded_blob = load_software_blob(blob)
with open(args.input_file, 'rb') as f:
input = f.read()
output = {
'sign': lambda: loaded_blob.sign(input),
'decrypt': lambda: loaded_blob.decrypt(input, unhexlify(args.iv),
unhexlify(args.auth_tag) if args.auth_tag != None else None),
# 'encrypt': lambda: loaded_blob.encrypt(input, unhexlify(args.iv)),
}[args.operation]()
sys.stdout.write(output)
else:
print_blob(blob)
'''
enums defined in keymaster interface.
'''
import enum
@enum.unique
class Algorithm(enum.Enum):
'''Algorithms that may be provided by keymaster implementations.
Those that must be provided by all implementations are tagged as "required".'''
## Asymmetric algorithms.
RSA = 1
# DSA = 2 # Removed, do not re-use value 2.
EC = 3
## Block ciphers algorithms
AES = 32
TRIPLE_DES = 33
## MAC algorithms
HMAC = 128
@enum.unique
class BlockMode(enum.Enum):
'''Symmetric block cipher modes provided by keymaster implementations. '''
## Unauthenticated modes, usable only for encryption/decryption and not generally recommended
## except for compatibility with existing other protocols.
ECB = 1
CBC = 2
CTR = 3
## Authenticated modes, usable for encryption/decryption and signing/verification. Recommended
## over unauthenticated modes for all purposes.
GCM = 32
@enum.unique
class Padding(enum.Enum):
'''
Padding modes that may be applied to plaintext for encryption operations.
This list includes padding modes for both symmetric and asymmetric algorithms.
Note that implementations should not provide all possible combinations of algorithm
and padding, only the cryptographically-appropriate pairs.
'''
NONE = 1 # deprecated
RSA_OAEP = 2
RSA_PSS = 3
RSA_PKCS1_1_5_ENCRYPT = 4
RSA_PKCS1_1_5_SIGN = 5
PKCS7 = 64
@enum.unique
class Digest(enum.Enum):
'''Digests provided by keymaster implementations.'''
NONE = 0
MD5 = 1 # Optional, may not be implemented in hardware, will be handled in software if needed.
SHA1 = 2
SHA_2_224 = 3
SHA_2_256 = 4
SHA_2_384 = 5
SHA_2_512 = 6
@enum.unique
class Kdf(enum.Enum):
'''Key derivation functions, mostly used in ECIES.'''
NONE = 0 # Do not apply a key derivation function; use the raw agreed key
RFC5869_SHA256 = 1 # HKDF defined in RFC 5869 with SHA256
ISO18033_2_KDF1_SHA1 = 2 # KDF1 defined in ISO 18033-2 with SHA1
ISO18033_2_KDF1_SHA256 = 3 # KDF1 defined in ISO 18033-2 with SHA256
ISO18033_2_KDF2_SHA1 = 4 # KDF2 defined in ISO 18033-2 with SHA1
ISO18033_2_KDF2_SHA256 = 5 # KDF2 defined in ISO 18033-2 with SHA256
@enum.unique
class EcCurve(enum.Enum):
'''Supported EC curves, used in ECDSA/ECIES.'''
P_224 = 0
P_256 = 1
P_384 = 2
P_521 = 3
@enum.unique
class KeyOrigin(enum.Enum):
'''
The origin of a key (or pair), i.e. where it was generated. Note that KM_TAG_ORIGIN can be found
in either the hardware-enforced or software-enforced list for a key, indicating whether the key
is hardware or software-based. Specifically, a key with KM_ORIGIN_GENERATED in the
hardware-enforced list is guaranteed never to have existed outide the secure hardware.
'''
GENERATED = 0 # Generated in keymaster. Should not exist outside the TEE.
DERIVED = 1 # Derived inside keymaster. Likely exists off-device.
IMPORTED = 2 # Imported into keymaster. Existed as cleartext in Android.
UNKNOWN = 3 # Keymaster did not record origin. This value can only be seen on
# keys in a keymaster0 implementation. The keymaster0 adapter uses
# this value to document the fact that it is unkown whether the key
# was generated inside or imported into keymaster.
@enum.unique
class KeyBlobUsageRequirements(enum.Enum):
'''
Usability requirements of key blobs. This defines what system functionality must be available
for the key to function. For example, key "blobs" which are actually handles referencing
encrypted key material stored in the file system cannot be used until the file system is
available, and should have BLOB_REQUIRES_FILE_SYSTEM. Other requirements entries will be added
as needed for implementations.
'''
STANDALONE = 0
REQUIRES_FILE_SYSTEM = 1
@enum.unique
class Purpose(enum.Enum):
'''Possible purposes of a key (or pair).'''
ENCRYPT = 0 # Usable with RSA, EC and AES keys.
DECRYPT = 1 # Usable with RSA, EC and AES keys.
SIGN = 2 # Usable with RSA, EC and HMAC keys.
VERIFY = 3 # Usable with RSA, EC and HMAC keys.
DERIVE_KEY = 4 # Usable with EC keys.
WRAP = 5 # Usable with wrapped keys.
AGREE_KEY = 6 # Usable with EC keys.
ATTEST_KEY = 7 # Usabe with RSA and EC keys
@enum.unique
class VerifiedBoot(enum.Enum):
VERIFIED = 0 # Full chain of trust extending from the bootloader to
# verified partitions, including the bootloader, boot
# partition, and all verified partitions
SELF_SIGNED = 1 # The boot partition has been verified using the embedded
# certificate, and the signature is valid. The bootloader
# displays a warning and the fingerprint of the public
# key before allowing the boot process to continue.
UNVERIFIED = 2 # The device may be freely modified. Device integrity is left
# to the user to verify out-of-band. The bootloader
# displays a warning to the user before allowing the boot
# process to continue
FAILED = 3 # The device failed verification. The bootloader displays a
# warning and stops the boot process, so no keymaster
# implementation should ever actually return this value,
# since it should not run. Included here only for
# completeness.
@enum.unique
class SecurityLevel(enum.Enum):
SOFTWARE = 0
TRUSTED_ENVIRONMENT = 1
STRONGBOX = 2
@enum.unique
class KeyFormat(enum.Enum):
'''Formats for key import and export.'''
X509 = 0 # for public key export
PKCS8 = 1 # for asymmetric key pair import
RAW = 3 # for symmetric key import and export
OperationHandle = int
'''
The keymaster operation API consists of begin, update, finish and abort. This is the type of the
handle used to tie the sequence of calls together. A 64-bit value is used because it's important
that handles not be predictable. Implementations must use strong random numbers for handle
values.
'''
@enum.unique
class Error(enum.Enum):
OK = 0
ROOT_OF_TRUST_ALREADY_SET = -1
UNSUPPORTED_PURPOSE = -2
INCOMPATIBLE_PURPOSE = -3
UNSUPPORTED_ALGORITHM = -4
INCOMPATIBLE_ALGORITHM = -5
UNSUPPORTED_KEY_SIZE = -6
UNSUPPORTED_BLOCK_MODE = -7
INCOMPATIBLE_BLOCK_MODE = -8
UNSUPPORTED_MAC_LENGTH = -9
UNSUPPORTED_PADDING_MODE = -10
INCOMPATIBLE_PADDING_MODE = -11
UNSUPPORTED_DIGEST = -12
INCOMPATIBLE_DIGEST = -13
INVALID_EXPIRATION_TIME = -14
INVALID_USER_ID = -15
INVALID_AUTHORIZATION_TIMEOUT = -16
UNSUPPORTED_KEY_FORMAT = -17
INCOMPATIBLE_KEY_FORMAT = -18
UNSUPPORTED_KEY_ENCRYPTION_ALGORITHM = -19 # For PKCS8 & PKCS12
UNSUPPORTED_KEY_VERIFICATION_ALGORITHM = -20 # For PKCS8 & PKCS12
INVALID_INPUT_LENGTH = -21
KEY_EXPORT_OPTIONS_INVALID = -22
DELEGATION_NOT_ALLOWED = -23
KEY_NOT_YET_VALID = -24
KEY_EXPIRED = -25
KEY_USER_NOT_AUTHENTICATED = -26
OUTPUT_PARAMETER_NULL = -27
INVALID_OPERATION_HANDLE = -28
INSUFFICIENT_BUFFER_SPACE = -29
VERIFICATION_FAILED = -30
TOO_MANY_OPERATIONS = -31
UNEXPECTED_NULL_POINTER = -32
INVALID_KEY_BLOB = -33
IMPORTED_KEY_NOT_ENCRYPTED = -34
IMPORTED_KEY_DECRYPTION_FAILED = -35
IMPORTED_KEY_NOT_SIGNED = -36
IMPORTED_KEY_VERIFICATION_FAILED = -37
INVALID_ARGUMENT = -38
UNSUPPORTED_TAG = -39
INVALID_TAG = -40
MEMORY_ALLOCATION_FAILED = -41
IMPORT_PARAMETER_MISMATCH = -44
SECURE_HW_ACCESS_DENIED = -45
OPERATION_CANCELLED = -46
CONCURRENT_ACCESS_CONFLICT = -47
SECURE_HW_BUSY = -48
SECURE_HW_COMMUNICATION_FAILED = -49
UNSUPPORTED_EC_FIELD = -50
MISSING_NONCE = -51
INVALID_NONCE = -52
MISSING_MAC_LENGTH = -53
KEY_RATE_LIMIT_EXCEEDED = -54
CALLER_NONCE_PROHIBITED = -55
KEY_MAX_OPS_EXCEEDED = -56
INVALID_MAC_LENGTH = -57
MISSING_MIN_MAC_LENGTH = -58
UNSUPPORTED_MIN_MAC_LENGTH = -59
UNSUPPORTED_KDF = -60
UNSUPPORTED_EC_CURVE = -61
KEY_REQUIRES_UPGRADE = -62
ATTESTATION_CHALLENGE_MISSING = -63
KEYMASTER_NOT_CONFIGURED = -64
ATTESTATION_APPLICATION_ID_MISSING = -65
CANNOT_ATTEST_IDS = -66
ROLLBACK_RESISTANCE_UNAVAILABLE = -67
NO_USER_CONFIRMATION = -71
DEVICE_LOCKED = -72
EARLY_BOOT_ENDED = -73
ATTESTATION_KEYS_NOT_PROVISIONED = -74
ATTESTATION_IDS_NOT_PROVISIONED = -75
INCOMPATIBLE_MGF_DIGEST = -78
UNSUPPORTED_MGF_DIGEST = -79
MISSING_NOT_BEFORE = -80
MISSING_NOT_AFTER = -81
MISSING_ISSUER_SUBJECT = -82
INVALID_ISSUER_SUBJECT = -83
BOOT_LEVEL_EXCEEDED = -84
UNIMPLEMENTED = -100
VERSION_MISMATCH = -101
UNKNOWN_ERROR = -1000
# Defined in hw_auth_token.h
@enum.unique
class AuthenticatorType(enum.Flag):
PASSWORD = 1 << 0
FINGERPRINT = 1 << 1
#!/usr/bin/env python3
'''
Vold-specific logic
Offers:
- CLI tool to decrypt a Vold key given its folder
Limitations:
- Right now this only supports Keymaster-encrypted keys
(keys encrypted without Keymaster are not supported).
- Right now this only supports the software Keymaster (softkeymaster).
(for forensics; not cryptographically secure)
based on:
- system/vold @ bba0592dcea
- KeyStorage.cpp
'''
import blobs
import os
import sys
from binascii import hexlify, unhexlify
# constants imported right from source code
AES_KEY_BYTES = 32
GCM_NONCE_BYTES = 12
GCM_MAC_BYTES = 16
SECDISCARDABLE_BYTES = 1 << 14
kCurrentVersion = "1"
kRmPath = "/system/bin/rm"
kSecdiscardPath = "/system/bin/secdiscard"
kStretch_none = "none"
kStretch_nopassword = "nopassword"
kHashPrefix_secdiscardable = "Android secdiscardable SHA512"
kHashPrefix_keygen = "Android key wrapping key generation SHA512"
kFn_encrypted_key = "encrypted_key"
kFn_keymaster_key_blob = "keymaster_key_blob"
kFn_keymaster_key_blob_upgraded = "keymaster_key_blob_upgraded"
kFn_secdiscardable = "secdiscardable"
kFn_stretching = "stretching"
kFn_version = "version"
# logic
expected_files = {
kFn_version,
kFn_stretching,
kFn_secdiscardable,
kFn_encrypted_key,
kFn_keymaster_key_blob,
kFn_keymaster_key_blob_upgraded,
}
def decrypt_key(key_dir: str, debug: bool=False):
# First open the folder
dir_fd = os.open(key_dir, os.O_RDONLY)
# Check it has the correct version
with open(os.open(kFn_version, os.O_RDONLY, dir_fd=dir_fd), 'r') as f:
version = f.read()
assert version == str(int(version))
version = int(version)
if version != 1:
raise NotImplementedError(f'key version {version} not implemented')
print(f'INFO: found Vold key version {version}', file=sys.stderr)
# Check for any extra files and warn
if extra_files := set(os.listdir(dir_fd)) - expected_files:
print('WARNING: extra files in key directory:', extra_files, file=sys.stderr)
# Check stretching
with open(os.open(kFn_stretching, os.O_RDONLY, dir_fd=dir_fd), 'r') as f:
uses_keymaster = { kStretch_none: False, kStretch_nopassword: True }[f.read()]
if not uses_keymaster:
raise NotImplementedError('non-KeyMaster keys are not implemented yet')
# Load KeyMaster blob
with open(os.open(kFn_keymaster_key_blob, os.O_RDONLY, dir_fd=dir_fd), 'rb') as f:
raw_blob = f.read()
assert raw_blob.startswith(blobs.KEYSTORE_MAGIC)
raw_blob = raw_blob[len(blobs.KEYSTORE_MAGIC):]
blob = blobs.decode_softkeymaster_blob(raw_blob)
if debug:
blobs.print_blob(blob)
# Load encrypted key
with open(os.open(kFn_encrypted_key, os.O_RDONLY, dir_fd=dir_fd), 'rb') as f:
payload = f.read()
assert len(payload) > GCM_NONCE_BYTES + GCM_MAC_BYTES
iv, enc_key, tag = payload[:GCM_NONCE_BYTES], payload[GCM_NONCE_BYTES:-GCM_MAC_BYTES], payload[-GCM_MAC_BYTES:]
# Decrypt key
loaded_blob = blobs.load_software_blob(blob)
return loaded_blob.decrypt(enc_key, iv, tag)
# CLI tool
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
usage='vold.py [-h] [options] KEY_DIR',
description='Decrypts a Vold key given its folder.')
parser.add_argument('key_dir', metavar='KEY_DIR',
help='key directory')
parser.add_argument('-r', '--raw', action=argparse.BooleanOptionalAction,
help='skips inspecting the blob and only outputs the raw decrypted key (without hex)')
parser.add_argument('--hex', action=argparse.BooleanOptionalAction,
help='for --raw, writes key in hex')
args = parser.parse_args()
dec_key = decrypt_key(args.key_dir, debug=not args.raw)
if args.raw:
sys.stdout.buffer.write(hexlify(dec_key) if args.hex else dec_key)
else:
print(f'Decrypted Vold key: \x1b[1m{hexlify(dec_key).decode()}\x1b[m')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment