Skip to content

Instantly share code, notes, and snippets.

@apprenticenaomi
Created July 13, 2017 07:24
Show Gist options
  • Save apprenticenaomi/b0103db2ac134cb51a3ead92f1d99f3a to your computer and use it in GitHub Desktop.
Save apprenticenaomi/b0103db2ac134cb51a3ead92f1d99f3a to your computer and use it in GitHub Desktop.
Python DeDRM KFX v0.1
# BinaryIon.pas + DrmIon.pas + IonSymbols.pas
import collections
import enum
import hashlib
import hmac
import os
import os.path
import struct
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from Crypto.Cipher import AES
from Crypto.Util.py3compat import bchr, bord
try:
import lzma
except ImportError:
# Need pip backports.lzma on Python <3.3
from backports import lzma
import kinf
TID_NULL = 0
TID_BOOLEAN = 1
TID_POSINT = 2
TID_NEGINT = 3
TID_FLOAT = 4
TID_DECIMAL = 5
TID_TIMESTAMP = 6
TID_SYMBOL = 7
TID_STRING = 8
TID_CLOB = 9
TID_BLOB = 0xA
TID_LIST = 0xB
TID_SEXP = 0xC
TID_STRUCT = 0xD
TID_TYPEDECL = 0xE
TID_UNUSED = 0xF
SID_UNKNOWN = -1
SID_ION = 1
SID_ION_1_0 = 2
SID_ION_SYMBOL_TABLE = 3
SID_NAME = 4
SID_VERSION = 5
SID_IMPORTS = 6
SID_SYMBOLS = 7
SID_MAX_ID = 8
SID_ION_SHARED_SYMBOL_TABLE = 9
SID_ION_1_0_MAX = 10
LEN_IS_VAR_LEN = 0xE
LEN_IS_NULL = 0xF
VERSION_MARKER = b"\x01\x00\xEA"
class SystemSymbols(object):
ION = '$ion'
ION_1_0 = '$ion_1_0'
ION_SYMBOL_TABLE = '$ion_symbol_table'
NAME = 'name'
VERSION = 'version'
IMPORTS = 'imports'
SYMBOLS = 'symbols'
MAX_ID = 'max_id'
ION_SHARED_SYMBOL_TABLE = '$ion_shared_symbol_table'
class IonCatalogItem(object):
name = ""
version = 0
symnames = []
def __init__(self, name, version, symnames):
self.name = name
self.version = version
self.symnames = symnames
class SymbolToken(object):
text = ""
sid = 0
def __init__(self, text, sid):
if text == "" and sid == 0:
raise ValueError("Symbol token must have Text or SID")
self.text = text
self.sid = sid
class SymbolTable(object):
table = None
def __init__(self):
self.table = [None] * SID_ION_1_0_MAX
self.table[SID_ION] = SystemSymbols.ION
self.table[SID_ION_1_0] = SystemSymbols.ION_1_0
self.table[SID_ION_SYMBOL_TABLE] = SystemSymbols.ION_SYMBOL_TABLE
self.table[SID_NAME] = SystemSymbols.NAME
self.table[SID_VERSION] = SystemSymbols.VERSION
self.table[SID_IMPORTS] = SystemSymbols.IMPORTS
self.table[SID_SYMBOLS] = SystemSymbols.SYMBOLS
self.table[SID_MAX_ID] = SystemSymbols.MAX_ID
self.table[SID_ION_SHARED_SYMBOL_TABLE] = SystemSymbols.ION_SHARED_SYMBOL_TABLE
def findbyid(self, sid):
if sid < 1:
raise ValueError("Invalid symbol id")
if sid < len(self.table):
return self.table[sid]
else:
return ""
def import_(self, table, maxid):
for i in range(maxid):
self.table.append(table.symnames[i])
def importunknown(self, name, maxid):
for i in range(maxid):
self.table.append("%s#%d" % (name, i + 1))
ParserState = enum.Enum("ParserState", "Invalid BeforeField BeforeTID BeforeValue AfterValue EOF")
ContainerRec = collections.namedtuple("ContainerRec", "nextpos, tid, remaining")
class BinaryIonParser(object):
eof = False
state = None
localremaining = 0
needhasnext = False
isinstruct = False
valuetid = 0
valuefieldid = 0
parenttid = 0
valuelen = 0
valueisnull = False
valueistrue = False
value = None
didimports = False
def __init__(self, stream):
self.annotations = []
self.catalog = []
self.stream = stream
self.initpos = stream.tell()
self.reset()
self.symbols = SymbolTable()
def reset(self):
self.state = ParserState.BeforeTID
self.needhasnext = True
self.localremaining = -1
self.eof = False
self.isinstruct = False
self.containerstack = []
self.stream.seek(self.initpos)
def addtocatalog(self, name, version, symbols):
self.catalog.append(IonCatalogItem(name, version, symbols))
def hasnext(self):
while self.needhasnext and not self.eof:
self.hasnextraw()
if len(self.containerstack) == 0 and not self.valueisnull:
if self.valuetid == TID_SYMBOL:
if self.value == SID_ION_1_0:
self.needhasnext = True
elif self.valuetid == TID_STRUCT:
for a in self.annotations:
if a == SID_ION_SYMBOL_TABLE:
self.parsesymboltable()
self.needhasnext = True
break
return not self.eof
def hasnextraw(self):
self.clearvalue()
while self.valuetid == -1 and not self.eof:
self.needhasnext = False
if self.state == ParserState.BeforeField:
assert self.valuefieldid == SID_UNKNOWN
self.valuefieldid = self.readfieldid()
if self.valuefieldid != SID_UNKNOWN:
self.state = ParserState.BeforeTID
else:
self.eof = True
elif self.state == ParserState.BeforeTID:
self.state = ParserState.BeforeValue
self.valuetid = self.readtypeid()
if self.valuetid == -1:
self.state = ParserState.EOF
self.eof = True
break
if self.valuetid == TID_TYPEDECL:
if self.valuelen == 0:
self.checkversionmarker()
else:
self.loadannotations()
elif self.state == ParserState.BeforeValue:
self.skip(self.valuelen)
self.state = ParserState.AfterValue
elif self.state == ParserState.AfterValue:
if self.isinstruct:
self.state = ParserState.BeforeField
else:
self.state = ParserState.BeforeTID
else:
assert self.state == ParserState.EOF
def next(self):
if self.hasnext():
self.needhasnext = True
return self.valuetid
else:
return -1
def push(self, typeid, nextposition, nextremaining):
self.containerstack.append(ContainerRec(nextpos=nextposition, tid=typeid, remaining=nextremaining))
def stepin(self):
assert self.valuetid in [TID_STRUCT, TID_LIST, TID_SEXP] and not self.eof, \
"valuetid=%s eof=%s" % (self.valuetid, self.eof)
assert (not self.valueisnull or self.state == ParserState.AfterValue) and \
(self.valueisnull or self.state == ParserState.BeforeValue)
nextrem = self.localremaining
if nextrem != -1:
nextrem -= self.valuelen
if nextrem < 0:
nextrem = 0
self.push(self.parenttid, self.stream.tell() + self.valuelen, nextrem)
self.isinstruct = (self.valuetid == TID_STRUCT)
if self.isinstruct:
self.state = ParserState.BeforeField
else:
self.state = ParserState.BeforeTID
self.localremaining = self.valuelen
self.parenttid = self.valuetid
self.clearvalue()
self.needhasnext = True
def stepout(self):
rec = self.containerstack.pop()
self.eof = False
self.parenttid = rec.tid
if self.parenttid == TID_STRUCT:
self.isinstruct = True
self.state = ParserState.BeforeField
else:
self.isinstruct = False
self.state = ParserState.BeforeTID
self.needhasnext = True
self.clearvalue()
curpos = self.stream.tell()
if rec.nextpos > curpos:
self.skip(rec.nextpos - curpos)
else:
assert rec.nextpos == curpos
self.localremaining = rec.remaining
def read(self, count=1):
if self.localremaining != -1:
self.localremaining -= count
assert self.localremaining >= 0
result = self.stream.read(count)
if len(result) == 0:
raise EOFError()
return result
def readfieldid(self):
if self.localremaining != -1 and self.localremaining < 1:
return -1
try:
return self.readvaruint()
except EOFError:
return -1
def readtypeid(self):
if self.localremaining != -1:
if self.localremaining < 1:
return -1
self.localremaining -= 1
b = self.stream.read(1)
if len(b) < 1:
return -1
b = bord(b)
result = b >> 4
ln = b & 0xF
if ln == LEN_IS_VAR_LEN:
ln = self.readvaruint()
elif ln == LEN_IS_NULL:
ln = 0
self.state = ParserState.AfterValue
elif result == TID_NULL:
# Must have LEN_IS_NULL
assert False
elif result == TID_BOOLEAN:
assert ln <= 1
self.valueistrue = (ln == 1)
ln = 0
self.state = ParserState.AfterValue
elif result == TID_STRUCT:
if ln == 1:
ln = self.readvaruint()
self.valuelen = ln
return result
def readvarint(self):
b = bord(self.read())
negative = ((b & 0x40) != 0)
result = (b & 0x3F)
i = 0
while (b & 0x80) == 0 and i < 4:
b = bord(self.read())
result = (result << 7) | (b & 0x7F)
i += 1
assert i < 4 or (b & 0x80) != 0, "int overflow"
if negative:
return -result
return result
def readvaruint(self):
b = bord(self.read())
result = (b & 0x7F)
i = 0
while (b & 0x80) == 0 and i < 4:
b = bord(self.read())
result = (result << 7) | (b & 0x7F)
i += 1
assert i < 4 or (b & 0x80) != 0, "int overflow"
return result
def readdecimal(self):
if self.valuelen == 0:
return 0.
rem = self.localremaining - self.valuelen
self.localremaining = self.valuelen
exponent = self.readvarint()
assert self.localremaining > 0, "Only exponent in ReadDecimal"
assert self.localremaining <= 8, "Decimal overflow"
signed = False
b = [bord(x) for x in self.read(self.localremaining)]
if (b[0] & 0x80) != 0:
b[0] = b[0] & 0x7F
signed = True
# Convert variably sized network order integer into 64-bit little endian
j = 0
vb = [0] * 8
for i in range(len(b), -1, -1):
vb[i] = b[j]
j += 1
v = struct.unpack("<Q", b"".join(bchr(x) for x in vb))[0]
result = v * (10 ** exponent)
if signed:
result = -result
self.localremaining = rem
return result
def skip(self, count):
if self.localremaining != -1:
self.localremaining -= count
if self.localremaining < 0:
raise EOFError()
self.stream.seek(count, os.SEEK_CUR)
def parsesymboltable(self):
self.next() # shouldn't do anything?
assert self.valuetid == TID_STRUCT
if self.didimports:
return
self.stepin()
fieldtype = self.next()
while fieldtype != -1:
if not self.valueisnull:
assert self.valuefieldid == SID_IMPORTS, "Unsupported symbol table field id"
if fieldtype == TID_LIST:
self.gatherimports()
fieldtype = self.next()
self.stepout()
self.didimports = True
def gatherimports(self):
self.stepin()
t = self.next()
while t != -1:
if not self.valueisnull and t == TID_STRUCT:
self.readimport()
t = self.next()
self.stepout()
def readimport(self):
version = -1
maxid = -1
name = ""
self.stepin()
t = self.next()
while t != -1:
if not self.valueisnull and self.valuefieldid != SID_UNKNOWN:
if self.valuefieldid == SID_NAME:
name = self.stringvalue()
elif self.valuefieldid == SID_VERSION:
version = self.intvalue()
elif self.valuefieldid == SID_MAX_ID:
maxid = self.intvalue()
t = self.next()
self.stepout()
if name == "" or name == SystemSymbols.ION:
return
if version < 1:
version = 1
table = self.findcatalogitem(name)
if maxid < 0:
assert table is not None and version == table.version, "Import %s lacks maxid" % name
maxid = len(table.symnames)
if table is not None:
self.symbols.import_(table, min(maxid, len(table.symnames)))
else:
self.symbols.importunknown(name, maxid)
def intvalue(self):
assert self.valuetid in [TID_POSINT, TID_NEGINT], "Not an int"
self.preparevalue()
return self.value
def stringvalue(self):
assert self.valuetid == TID_STRING, "Not a string"
if self.valueisnull:
return ""
self.preparevalue()
return self.value
def symbolvalue(self):
assert self.valuetid == TID_SYMBOL, "Not a symbol"
self.preparevalue()
result = self.symbols.findbyid(self.value)
if result == "":
result = "SYMBOL#%d" % self.value
return result
def lobvalue(self):
assert self.valuetid in [TID_CLOB, TID_BLOB], "Not a LOB type: %s" % self.getfieldname()
if self.valueisnull:
return None
result = self.read(self.valuelen)
self.state = ParserState.AfterValue
return result
def decimalvalue(self):
assert self.valuetid == TID_DECIMAL, "Not a decimal"
self.preparevalue()
return self.value
def preparevalue(self):
if self.value is None:
self.loadscalarvalue()
def loadscalarvalue(self):
if self.valuetid not in [TID_NULL, TID_BOOLEAN, TID_POSINT, TID_NEGINT,
TID_FLOAT, TID_DECIMAL, TID_TIMESTAMP,
TID_SYMBOL, TID_STRING]:
return
if self.valueisnull:
self.value = None
return
if self.valuetid == TID_STRING:
self.value = self.read(self.valuelen).decode("UTF-8")
elif self.valuetid in (TID_POSINT, TID_NEGINT, TID_SYMBOL):
if self.valuelen == 0:
self.value = 0
else:
assert self.valuelen <= 4, "int too long: %d" % self.valuelen
v = 0
for i in range(self.valuelen - 1, -1, -1):
v = (v | (bord(self.read()) << (i * 8)))
if self.valuetid == TID_NEGINT:
self.value = -v
else:
self.value = v
elif self.valuetid == TID_DECIMAL:
self.value = self.readdecimal()
#else:
# assert False, "Unhandled scalar type %d" % self.valuetid
self.state = ParserState.AfterValue
def clearvalue(self):
self.valuetid = -1
self.value = None
self.valueisnull = False
self.valuefieldid = SID_UNKNOWN
self.annotations = []
def loadannotations(self):
ln = self.readvaruint()
maxpos = self.stream.tell() + ln
while self.stream.tell() < maxpos:
self.annotations.append(self.readvaruint())
self.valuetid = self.readtypeid()
def checkversionmarker(self):
for i in VERSION_MARKER:
assert self.read() == i, "Unknown version marker"
self.valuelen = 0
self.valuetid = TID_SYMBOL
self.value = SID_ION_1_0
self.valueisnull = False
self.valuefieldid = SID_UNKNOWN
self.state = ParserState.AfterValue
def findcatalogitem(self, name):
for result in self.catalog:
if result.name == name:
return result
def forceimport(self, symbols):
item = IonCatalogItem("Forced", 1, symbols)
self.symbols.import_(item, len(symbols))
def getfieldname(self):
if self.valuefieldid == SID_UNKNOWN:
return ""
return self.symbols.findbyid(self.valuefieldid)
def getfieldnamesymbol(self):
return SymbolToken(self.getfieldname(), self.valuefieldid)
def gettypename(self):
if len(self.annotations) == 0:
return ""
return self.symbols.findbyid(self.annotations[0])
@staticmethod
def printlob(b):
if b is None:
return "null"
result = ""
for i in b:
result += ("%02x " % bord(i))
if len(result) > 0:
result = result[:-1]
return result
def ionwalk(self, supert, indent, lst):
while self.hasnext():
if supert == TID_STRUCT:
L = self.getfieldname() + ":"
else:
L = ""
t = self.next()
if t in [TID_STRUCT, TID_LIST]:
if L != "":
lst.append(indent + L)
L = self.gettypename()
if L != "":
lst.append(indent + L + "::")
if t == TID_STRUCT:
lst.append(indent + "{")
else:
lst.append(indent + "[")
self.stepin()
self.ionwalk(t, indent + " ", lst)
self.stepout()
if t == TID_STRUCT:
lst.append(indent + "}")
else:
lst.append(indent + "]")
else:
if t == TID_STRING:
L += ('"%s"' % self.stringvalue())
elif t in [TID_CLOB, TID_BLOB]:
L += ("{%s}" % self.printlob(self.lobvalue()))
elif t == TID_POSINT:
L += str(self.intvalue())
elif t == TID_SYMBOL:
tn = self.gettypename()
if tn != "":
tn += "::"
L += tn + self.symbolvalue()
elif t == TID_DECIMAL:
L += str(self.decimalvalue())
else:
L += ("TID %d" % t)
lst.append(indent + L)
def print_(self, lst):
self.reset()
self.ionwalk(-1, "", lst)
SYM_NAMES = [ 'com.amazon.drm.Envelope@1.0',
'com.amazon.drm.EnvelopeMetadata@1.0', 'size', 'page_size',
'encryption_key', 'encryption_transformation',
'encryption_voucher', 'signing_key', 'signing_algorithm',
'signing_voucher', 'com.amazon.drm.EncryptedPage@1.0',
'cipher_text', 'cipher_iv', 'com.amazon.drm.Signature@1.0',
'data', 'com.amazon.drm.EnvelopeIndexTable@1.0', 'length',
'offset', 'algorithm', 'encoded', 'encryption_algorithm',
'hashing_algorithm', 'expires', 'format', 'id',
'lock_parameters', 'strategy', 'com.amazon.drm.Key@1.0',
'com.amazon.drm.KeySet@1.0', 'com.amazon.drm.PIDv3@1.0',
'com.amazon.drm.PlainTextPage@1.0',
'com.amazon.drm.PlainText@1.0', 'com.amazon.drm.PrivateKey@1.0',
'com.amazon.drm.PublicKey@1.0', 'com.amazon.drm.SecretKey@1.0',
'com.amazon.drm.Voucher@1.0', 'public_key', 'private_key',
'com.amazon.drm.KeyPair@1.0', 'com.amazon.drm.ProtectedData@1.0',
'doctype', 'com.amazon.drm.EnvelopeIndexTableOffset@1.0',
'enddoc', 'license_type', 'license', 'watermark', 'key', 'value',
'com.amazon.drm.License@1.0', 'category', 'metadata',
'categorized_metadata', 'com.amazon.drm.CategorizedMetadata@1.0',
'com.amazon.drm.VoucherEnvelope@1.0', 'mac', 'voucher',
'com.amazon.drm.ProtectedData@2.0',
'com.amazon.drm.Envelope@2.0',
'com.amazon.drm.EnvelopeMetadata@2.0',
'com.amazon.drm.EncryptedPage@2.0',
'com.amazon.drm.PlainText@2.0', 'compression_algorithm',
'com.amazon.drm.Compressed@1.0', 'priority', 'refines']
def addprottable(ion):
ion.addtocatalog("ProtectedData", 1, SYM_NAMES)
def pkcs7pad(msg, blocklen):
paddinglen = blocklen - len(msg) % blocklen
padding = bchr(paddinglen) * paddinglen
return msg + padding
def pkcs7unpad(msg, blocklen):
assert len(msg) % blocklen == 0
paddinglen = bord(msg[-1])
assert paddinglen > 0 and paddinglen <= blocklen
assert msg[-paddinglen:] == bchr(paddinglen) * paddinglen
return msg[:-paddinglen]
class DrmIonVoucher(object):
envelope = None
voucher = None
drmkey = None
encalgorithm = ""
enctransformation = ""
hashalgorithm = ""
lockparams = None
ciphertext = b""
cipheriv = b""
secretkey = b""
def __init__(self, voucherenv):
self.lockparams = []
self.envelope = BinaryIonParser(voucherenv)
addprottable(self.envelope)
def decryptvoucher(self):
shared = "PIDv3" + self.encalgorithm + self.enctransformation + self.hashalgorithm
path = kinf.getkinfpath()
assert path != "" and os.path.isfile(path), "Unable to locate secure storage (tried looking at %s)" % path
kinf_ = kinf.readkinf(path)
self.lockparams.sort()
for param in self.lockparams:
if param == "ACCOUNT_SECRET":
shared += param + kinf_["kindle.account.tokens"]
elif param == "CLIENT_ID":
shared += param + kinf_["DSN"]
else:
assert False, "Unknown lock parameter: %s" % param
sharedsecret = shared.encode("UTF-8")
key = hmac.new(sharedsecret, sharedsecret[:5], digestmod=hashlib.sha256).digest()
aes = AES.new(key[:32], AES.MODE_CBC, self.cipheriv[:16])
b = aes.decrypt(self.ciphertext)
b = pkcs7unpad(b, 16)
self.drmkey = BinaryIonParser(StringIO(b))
addprottable(self.drmkey)
assert self.drmkey.hasnext() and self.drmkey.next() == TID_LIST and self.drmkey.gettypename() == "com.amazon.drm.KeySet@1.0", \
"Expected KeySet, got %s" % self.drmkey.gettypename()
self.drmkey.stepin()
while self.drmkey.hasnext():
self.drmkey.next()
if self.drmkey.gettypename() != "com.amazon.drm.SecretKey@1.0":
continue
self.drmkey.stepin()
while self.drmkey.hasnext():
self.drmkey.next()
if self.drmkey.getfieldname() == "algorithm":
assert self.drmkey.stringvalue() == "AES", "Unknown cipher algorithm: %s" % self.drmkey.stringvalue()
elif self.drmkey.getfieldname() == "format":
assert self.drmkey.stringvalue() == "RAW", "Unknown key format: %s" % self.drmkey.stringvalue()
elif self.drmkey.getfieldname() == "encoded":
self.secretkey = self.drmkey.lobvalue()
self.drmkey.stepout()
break
self.drmkey.stepout()
def parse(self):
self.envelope.reset()
assert self.envelope.hasnext(), "Envelope is empty"
assert self.envelope.next() == TID_STRUCT and self.envelope.gettypename() == "com.amazon.drm.VoucherEnvelope@1.0", \
"Unknown type encountered in envelope, expected VoucherEnvelope"
self.envelope.stepin()
while self.envelope.hasnext():
self.envelope.next()
field = self.envelope.getfieldname()
if field == "voucher":
self.voucher = BinaryIonParser(StringIO(self.envelope.lobvalue()))
addprottable(self.voucher)
continue
elif field != "strategy":
continue
assert self.envelope.gettypename() == "com.amazon.drm.PIDv3@1.0", "Unknown strategy: %s" % self.envelope.gettypename()
self.envelope.stepin()
while self.envelope.hasnext():
self.envelope.next()
field = self.envelope.getfieldname()
if field == "encryption_algorithm":
self.encalgorithm = self.envelope.stringvalue()
elif field == "encryption_transformation":
self.enctransformation = self.envelope.stringvalue()
elif field == "hashing_algorithm":
self.hashalgorithm = self.envelope.stringvalue()
elif field == "lock_parameters":
self.envelope.stepin()
while self.envelope.hasnext():
assert self.envelope.next() == TID_STRING, "Expected string list for lock_parameters"
self.lockparams.append(self.envelope.stringvalue())
self.envelope.stepout()
self.envelope.stepout()
self.parsevoucher()
def parsevoucher(self):
assert self.voucher.hasnext(), "Voucher is empty"
assert self.voucher.next() == TID_STRUCT and self.voucher.gettypename() == "com.amazon.drm.Voucher@1.0", \
"Unknown type, expected Voucher"
self.voucher.stepin()
while self.voucher.hasnext():
self.voucher.next()
if self.voucher.getfieldname() == "cipher_iv":
self.cipheriv = self.voucher.lobvalue()
elif self.voucher.getfieldname() == "cipher_text":
self.ciphertext = self.voucher.lobvalue()
def printenvelope(self, lst):
self.envelope.print_(lst)
def printkey(self, lst):
if self.voucher is None:
self.parse()
if self.drmkey is None:
self.decryptvoucher()
self.drmkey.print_(lst)
def printvoucher(self, lst):
if self.voucher is None:
self.parse()
self.voucher.print_(lst)
class DrmIon(object):
ion = None
voucher = None
vouchername = ""
key = b""
onvoucherrequired = None
def __init__(self, ionstream, onvoucherrequired):
self.ion = BinaryIonParser(ionstream)
addprottable(self.ion)
self.onvoucherrequired = onvoucherrequired
def parse(self, outfn):
self.ion.reset()
assert self.ion.hasnext(), "DRMION envelope is empty"
assert self.ion.next() == TID_SYMBOL and self.ion.gettypename() == "doctype", "Expected doctype symbol"
assert self.ion.next() == TID_LIST and self.ion.gettypename() == "com.amazon.drm.Envelope@2.0", \
"Unknown type encountered in DRMION envelope, expected Envelope, got %s" % self.ion.gettypename()
with open(outfn, "wb") as outpages:
while True:
if self.ion.gettypename() == "enddoc":
break
self.ion.stepin()
while self.ion.hasnext():
self.ion.next()
if self.ion.gettypename() == "com.amazon.drm.EnvelopeMetadata@2.0":
self.ion.stepin()
while self.ion.hasnext():
self.ion.next()
if self.ion.getfieldname() != "encryption_voucher":
continue
if self.vouchername == "":
self.vouchername = self.ion.stringvalue()
self.voucher = self.onvoucherrequired(self.vouchername)
self.voucher.parse()
self.voucher.decryptvoucher()
self.key = self.voucher.secretkey
assert self.key is not None, "Unable to obtain secret key from voucher"
else:
assert self.vouchername == self.ion.stringvalue(), \
"Unexpected: Different vouchers required for same file?"
self.ion.stepout()
elif self.ion.gettypename() == "com.amazon.drm.EncryptedPage@2.0":
ct = None
civ = None
self.ion.stepin()
while self.ion.hasnext():
self.ion.next()
if self.ion.getfieldname() == "cipher_text":
ct = self.ion.lobvalue()
elif self.ion.getfieldname() == "cipher_iv":
civ = self.ion.lobvalue()
if ct is not None and civ is not None:
self.processpage(ct, civ, outpages)
self.ion.stepout()
self.ion.stepout()
if not self.ion.hasnext():
break
self.ion.next()
def print_(self, lst):
self.ion.print_(lst)
def processpage(self, ct, civ, outpages):
aes = AES.new(self.key[:16], AES.MODE_CBC, civ[:16])
msg = pkcs7unpad(aes.decrypt(ct), 16)
assert msg[0] == b"\x00", "LZMA UseFilter not supported"
decomp = lzma.LZMADecompressor(format=lzma.FORMAT_ALONE)
while not decomp.eof:
segment = decomp.decompress(msg[1:])
msg = b"" # Contents were internally buffered after the first call
outpages.write(segment)
# Kinf.pas + Tools.pas + WinCrypt.pas
import hashlib
import itertools
import os
from Crypto.Cipher import AES
from Crypto.Util.py3compat import bchr, bord
# pywin32
import win32com
import win32crypt
KINF_HEADER_KEY_STR = 'n5Pr6St7Uv8Wx9YzAb0Cd1Ef2Gh3Jk4M'
KINF_RECCOUNT_KEY_STR = 'AzB0bYyCeVvaZ3FfUuG4g-TtHh5SsIiR6rJjQq7KkPpL8lOoMm9Nn_c1XxDdW2wE'
KINF_STOREITEM_KEY_STR = 'YvaZ3FfUm9Nn_c1XuG4yCAzB0beVg-TtHh5SsIiR6rJjQdW2wEq7KkPpL8lOoMxD'
KINF_STORENAMES = ['kindle.account.tokens', 'kindle.cookie.item', 'eulaVersionAccepted',
'login_date', 'kindle.token.item', 'login', 'kindle.key.item', 'kindle.name.info',
'kindle.device.info', 'MazamaRandomNumber', 'max_date', 'SIGVERIF', 'build_version',
'SerialNumber', 'UsernameHash', 'kindle.directedid.info', 'DSN']
def getkinfpath():
appdata = os.getenv("LOCALAPPDATA")
return appdata + "\\Amazon\\Kindle\\storage\\.kinf2011"
def strdecode(data, key):
result = b""
for i in range(len(data) // 2):
high = key.find(data[i * 2])
low = key.find(data[i * 2 + 1])
if high == -1 or low == -1:
break
high = (high * len(key)) ^ 0x80
result += bchr(high + low)
return result
def strencode(data, key):
result = b""
L = len(key)
for i in data:
result += key[(bord(i) ^ 0x80) // L]
result += key[bord(i) % L]
return result
def md5(msg):
m = hashlib.md5()
m.update(msg)
return m.digest()
def sha1(msg):
m = hashlib.sha1()
m.update(msg)
return m.digest()
def encodehash(data, key):
return strencode(md5(data), key)
def unprotectheaderdata(enc):
passwd = "header_key_data"
salt = "HEADER.2011"
keyiv = hashlib.pbkdf2_hmac("sha1", passwd, salt, 0x80, dklen=48)
aes = AES.new(keyiv[:32], AES.MODE_CBC, keyiv[32:32+16])
return aes.decrypt(enc)
def find(text, searchfrom, searchto):
i = text.find(searchfrom)
if i < 0:
return None
i += len(searchfrom)
j = text.find(searchto, i)
if j < 0:
j = len(text)
return text[i:j]
def eratosthenes():
D = {}
yield 2
for q in itertools.islice(itertools.count(3), 0, None, 2):
p = D.pop(q, None)
if p is None:
D[q*q] = q
yield q
else:
x = p + q
while x in D or not (x & 1):
x += p
D[x] = p
def maximal_prime(limit):
result = -1
for result in itertools.takewhile(lambda x: x <= limit, eratosthenes()):
pass
return result
def readkinf(filename):
result = {}
sl = open(filename, "rb").read().split(b"/")
header = unprotectheaderdata(strdecode(sl[0], KINF_HEADER_KEY_STR))
build = find(header, b"[Build:", b"]")
guid = find(header, b"[Guid:", b"]")
assert build is not None and guid is not None, b"Header data not found\n%s" % header
addedentropy = build + guid
i = 1
while i < len(sl) - 1:
keyhash = sl[i][:32]
reccount = int(strdecode(sl[i][34:34+16], KINF_RECCOUNT_KEY_STR))
keyname = "unknown%d" % i
for name in KINF_STORENAMES:
if encodehash(name, KINF_STOREITEM_KEY_STR) == keyhash:
keyname = name
break
encdata = b""
for _ in range(1, reccount + 1):
i += 1
encdata += sl[i]
# Cut the deck
noff = len(encdata) - maximal_prime(len(encdata) // 3)
encdata = encdata[noff:] + encdata[:noff]
enc = strdecode(encdata, KINF_STOREITEM_KEY_STR)
_, data = win32crypt.CryptUnprotectData(enc, sha1(keyhash) + addedentropy, None, None, 1)
result[keyname] = data
i += 1
return result
# KRFTable.pas
KRF_TABLE = ['language', 'font_family', 'font_style', 'font_weight', 'font_variant', 'font_stretch',
'font_size', 'font_scale', 'ot_features', 'text_color', 'text_opacity',
'text_background_color', 'text_background_opacity', 'underline', 'underline_color',
'underline_opacity', 'underline_weight', 'strikethrough', 'strikethrough_color',
'strikethrough_opacity', 'strikethrough_weight', 'baseline_shift', 'letterspacing',
'wordspacing', 'text_alignment', 'text_alignment_last', 'text_indent', 'left_indent',
'right_indent', 'space_before', 'space_after', 'text_transform', 'line_height',
'line_height_fit', 'baseline_style', 'nobreak', 'margin', 'margin_top', 'margin_left',
'margin_bottom', 'margin_right', 'padding', 'padding_top', 'padding_left', 'padding_bottom',
'padding_right', 'width', 'height', 'top', 'left', 'bottom', 'right', 'min_height',
'min_width', 'max_height', 'max_width', 'fixed_width', 'fixed_height', 'visibility',
'ignore', 'fill_color', 'fill_gradient', 'fill_opacity', 'fill_bounds', 'fill_rule',
'stroke_color', 'stroke_width', 'stroke_linecap', 'border_opacity', 'border_opacity_top',
'border_opacity_left', 'border_opacity_bottom', 'border_opacity_right', 'border_color',
'border_color_top', 'border_color_left', 'border_color_bottom', 'border_color_right',
'border_style', 'border_style_top', 'border_style_left', 'border_style_bottom',
'border_style_right', 'border_weight', 'border_weight_top', 'border_weight_left',
'border_weight_bottom', 'border_weight_right', 'transform', 'draw_spanning_borders',
'list_style', 'list_indent_style', 'list_indent', 'list_replacer', 'list_start_offset',
'outline_color', 'outline_offset', 'outline_style', 'outline_weight', 'gradient_type',
'gradient_stops', 'gradient_stop', 'column_count', 'column_gap', 'column_min_width',
'column_rule_style', 'column_rule_color', 'column_rule_weight', 'column_span',
'column_balance', 'footnote_line_style', 'footnote_line_color', 'footnote_line_weight',
'footnote_line_length', 'footnote_spacing', 'dropcap_lines', 'dropcap_chars', 'hyphens',
'min_hyphen_word_length', 'min_chars_per_line', 'keep_together', 'first', 'last',
'break_after', 'break_before', 'break_inside', 'max_auto_grow', 'min_auto_shrink',
'scale_with_image', 'wrap_rule', 'float', 'page_templates', 'style_events', 'offset',
'length', 'content', 'content_list', 'knockout_region', 'table_column_span',
'table_row_span', 'table_border_collapse', 'header', 'column_format', 'title',
'description', 'id', 'layout', 'style', 'parent_style', 'type', 'embed', 'format', 'mime',
'target', 'external_resource', 'location', 'search_path', 'referred_resources', 'manifest',
'reading_orders', 'sections', 'condition', 'conditional_styling', 'style_name',
'section_name', 'resource_name', 'story_name', 'gradient_name', 'reading_order_name',
'link_to', 'anchor_name', 'contains', 'locations', 'position', 'pid', 'eid', 'uri',
'link_confirm', 'link_use_external_app', 'up_image', 'down_image', 'paragraph_mark',
'direction', 'PRIVATE_parent_image_scale', 'PRIVATE_view_width', 'PRIVATE_view_height',
'PRIVATE_is_storyline_content', 'PRIVATE_paper_color', 'PRIVATE_ink_color', 'section_title',
'section_kicker', 'section_description', 'section_author', 'section_tags',
'section_date_created', 'is_advertisement', 'smooth_scrolling', 'hide_from_toc',
'section_layout', 'has_audio', 'has_video', 'has_slideshow', 'toc', 'scrubbers',
'thumbnails', 'orientation', 'binding_direction', 'support_portrait', 'support_landscape',
'issue_date', 'binding_direction_left', 'binding_direction_right', 'author', 'ISBN', 'ASIN',
'is_TTS_enabled', 'date_created', 'ISBN-10', 'ISBN-13', 'MHID', 'target_WideDimension',
'target_NarrowDimension', 'publisher', 'cover_page', 'illustrator', 'nav_type', 'landmarks',
'page_list', 'landmark_type', 'nav_container_name', 'nav_unit_name', 'representation',
'designation', 'enumeration', 'label', 'icon', 'target_position', 'entries', 'entry_set',
'path', 'shape_list', 'cde_content_type', 'container_list', 'entity_dependencies',
'mandatory_dependencies', 'optional_dependencies', 'AmazonDigitalBook', 'inherit',
'metadata', 'storyline', 'section', 'style_group', 'font', 'gradient', 'position_map',
'position_id_map', 'anchor', 'section_metadata', 'hyphen_dictionary', 'text', 'container',
'image', 'kvg', 'shape', 'plugin', 'knockout', 'list', 'listitem', 'table', 'table_row',
'sidebar', 'footnote', 'figure', 'inline', 'png', 'jpg', 'gif', 'pobject', 'localPage',
'hasContent', 'paragraphMark', 'or', 'and', 'not', '==', '!=', '>', '>=', '<', '<=',
'hasColor', 'hasVideo', 'screenPixelWidth', 'screenPixelHeight', 'screenActualWidth',
'screenActualHeight', 'unit', 'value', 'em', 'ex', 'lh', 'vw', 'vh', 'vmin', 'percent',
'cm', 'mm', 'in', 'pt', 'px', 'center', 'justify', 'horizontal', 'vertical', 'fixed',
'overflow', 'scale_fit', 'radial', 'solid', 'double', 'dashed', 'dotted', 'thick_thin',
'thin_thick', 'groove', 'ridge', 'inset', 'outset', 'non_zero', 'even_odd', 'disc',
'square', 'circle', 'numeric', 'roman_lower', 'roman_upper', 'alpha_lower', 'alpha_upper',
'null', 'none', 'normal', 'default', 'always', 'avoid', 'column', 'thin', 'ultra_light',
'light', 'book', 'medium', 'semi_bold', 'bold', 'ultra_bold', 'heavy', 'ultra_heavy',
'condensed', 'semi_condensed', 'semi_expanded', 'expanded', 'small_caps', 'superscript',
'subscript', 'uppercase', 'lowercase', 'titlecase', 'rtl', 'ltr', 'content_bounds',
'border_bounds', 'padding_bounds', 'margin_bounds', 'oblique', 'italic', 'auto', 'manual',
'portrait', 'landscape', 'preview_images', 'overlay_resource', 'book_navigation',
'section_navigation', 'nav_container', 'nav_containers', 'nav_unit',
'conditional_nav_group_unit', 'resource_path', 'srl', 'titlepage', 'acknowledgements',
'preface', 'loi', 'lot', 'bibliography', 'index', 'glossary', 'frontmatter', 'bodymatter',
'backmatter', 'erl', 'bcContId', 'bcComprType', 'bcDRMScheme', 'bcChunkSize',
'bcIndexTabOffset', 'bcIndexTabLength', 'bcDocSymbolOffset', 'bcDocSymbolLength',
'bcRawMedia', 'bcRawFont', 'container_entity_map', 'pbm', 'both', 'resource_width',
'resource_height', 'cover_image', 'page_progression_direction', 'activate', 'ordinal',
'action', 'backdrop_style', 'hide', 'show', 'blank', 'orientation_lock', 'virtual_panel',
'auto_crop', 'selection', 'page_spread', 'facing_page', 'zoom_target', 'popup', 'enabled',
'disabled', 'zoom_panel', 'popup_text', 'text_vert_anchor', 'text_hori_anchor', 'text_top',
'text_baseline', 'text_bottom', 'text_start', 'text_middle', 'text_end', 'caption', 'body',
'footer', 'border_spacing_vertical', 'border_spacing_horizontal', 'hide_empty_cells',
'border_radius_top_left', 'border_radius_top_right', 'border_radius_bottom_left',
'border_radius_bottom_right', 'PRIVATE_doc_fonts', 'volume_label', 'parent_asin',
'asset_id', 'revision_id', 'zoom_in', 'zoom_out', 'btt', 'ttb', 'force', 'scale', 'source',
'fit_text', 'clip', 'spacing_percent_base', 'fit_width', 'background_image',
'background_positionx', 'background_positiony', 'background_sizex', 'background_sizey',
'background_repeat', 'repeat_x', 'repeat_y', 'no_repeat', 'relative', 'viewport',
'book_metadata', 'categorised_metadata', 'key', 'priority', 'refines', 'category',
'shadows', 'text_shadows', 'color', 'horizontal_offset', 'vertical_offset', 'blur',
'spread', 'list_style_image', 'custom_viewer', 'rem', 'ch', 'vmax', 'gridlines',
'parameter_list', 'set_parameters', 'hang_punctuation', 'layouts', 'layout_name',
'grid_system', 'component_layout', '+', '-', '*', '/', 'asSymbol', 'asString', 'asNumber',
'asList', 'asStructure', 'isLandscape', 'isPortrait', 'isFirstPage',
'text_background_image', 'stroke_linejoin', 'stroke_miterlimit', 'stroke_dasharray',
'stroke_dashoffset', 'round', 'butt', 'miter', 'bevel', 'component', 'document_data',
'component_name', 'salience', 'border_radius', 'clip_path_list', 'clip_path', 'clip_rule',
'clip_path_index', 'sizing_bounds', 'background_origin', 'jxr', 'transform_origin',
'location_map', 'list_style_position', 'inside', 'outside', 'overline', 'overline_color',
'overline_weight', 'horizontal_tb', 'vertical_lr', 'vertical_rl', 'writing_mode',
'all_small_caps', 'ligatures', 'kerning', 'page_index', 'pdf', 'text_overflow', 'ellipsis',
'text_clip', 'word_break', 'break_all', 'kicker', 'article_id', 'all', 'browse',
'nav_visibility', 'link_visited_style', 'link_unvisited_style', 'nbsp_mode', 'space',
'box_align', 'pan_zoom', 'letterspacing_left', 'glyph_transform', 'alt_text',
'content_features', 'namespace', 'major_version', 'minor_version', 'version_info',
'features', 'exclude', 'include', 'format_capabilities', 'bcFCapabilitiesOffset',
'bcFCapabilitiesLength', 'horizontal_rule', 'auxiliary_data', 'kfx_id', 'bmp', 'tiff',
'render', 'block', 'layout_type', 'model', 'word_iteration_type', 'word', 'icu',
'structure', 'section_position_id_map', 'yj.eidhash_eid_section_map',
'yj.section_pid_count_map', 'yj.bpg', 'yj.authoring', 'yj.conversion', 'yj.classification',
'yj.display', 'yj.note', 'yj.chapternote', 'yj.endnote', 'yj.sidenote',
'yj.location_pid_map', 'yj.first_line_style', 'yj.number_of_lines', 'yj.percentage',
'yj.first_line_style_type', 'yj.kfxid_eid_map', 'yj.interactive_element_list',
'yj.float_clear', 'yj.table_features', 'yj.table_selection_mode', 'yj.rowwise',
'yj.regional', 'yj.vertical_align', 'yj.sorting', 'yj.variants', 'yj.tiles',
'yj.tile_width', 'yj.tile_height', 'yj.user_margin_top_percentage',
'yj.user_margin_bottom_percentage', 'yj.user_margin_left_percentage',
'yj.user_margin_right_percentage', 'yj.header_overlay', 'yj.footer_overlay', 'yj.max_crop',
'yj.collision', 'yj.min_aspect_ratio', 'yj.max_aspect_ratio', 'yj.viewer', 'yj.border_path',
'yj.majority', 'yj.queue', 'yj.connected_page_spread', 'yj.connected_panels',
'yj.connected_pagination', 'yj.enable_connected_dps', 'yj.disable_stacking',
'yj.float_align', 'yj.supports', 'yj.illustrated_layout', 'yj.disable_adaptive_layout',
'yj.disable_repeated_headers', 'yj.conditional_properties', 'yj.sdl_version',
'yj.comic_panel_view_mode', 'yj.guided_view', 'yj.content_defined', 'yj.auto_contrast',
'yj.before', 'yj.after', 'yj.at', 'yj.float_bias', 'yj.float_to_block', 'bidi_unicode',
'bidi_embed', 'isolate', 'override', 'isolate_override', 'plaintext', 'start', 'end',
'bidi_direction', 'annotation', 'pan_zoom_viewer', 'select_as_group', 'kvg_content_type',
'annotation_type', 'math', 'mathsegment', 'mathml', 'nontext', 'path_bundle', 'path_list',
'arabic_indic', 'persian']
def getkrftable():
return KRF_TABLE[:]
# Main.pas
from __future__ import print_function
import collections
import os.path
import struct
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
import ion
import kinf
import krf
Entity = collections.namedtuple("Entity", "id, typeindex, offset, length")
MAGIC = b"\xEADRMION\xEE"
bookpath = None
lookupid = {}
def onvoucherrequired(name):
fs = open(os.path.join(bookpath, name + ".voucher"), "rb")
return ion.DrmIonVoucher(fs)
def drmionprepare(filename):
contents = None
with open(filename, "rb") as f:
contents = f.read()
# Strip 8 byte footer
result = StringIO(contents[:-8])
s = result.read(8)
assert s == MAGIC, "DRMION header not found in file"
return result
def decrypt(filename, savefilename=None):
global bookpath
bookpath = os.path.dirname(filename)
ms = drmionprepare(filename)
ion_ = ion.DrmIon(ms, onvoucherrequired)
if savefilename is not None:
ion_.parse(savefilename)
def printdrmion(filename):
global bookpath
bookpath = os.path.dirname(filename)
ms = drmionprepare(filename)
ion_ = ion.DrmIon(ms, onvoucherrequired)
lines = []
ion_.print_(lines)
for line in lines:
print(line)
def printvoucher(filename):
lines = []
with open(filename, "rb") as fs:
ion_ = ion.DrmIonVoucher(fs)
ion_.parse()
ion_.printenvelope(lines)
ion_.printvoucher(lines)
ion_.printkey(lines)
for line in lines:
print(line)
def printkinf():
kinf_ = kinf.readkinf(kinf.getkinfpath())
for k, v in kinf_.items():
print("%s:%s" % (k, v))
def loadcont(fn):
with open(fn, "rb") as fs:
magic = fs.read(4)
assert magic == b"CONT", "File does not start with a CONT header"
(version, headerlen, infooffset, infolen) = struct.unpack("<HIII", fs.read(14))
print("CONT v%d. InfoOffset: %d, InfoLen: %d." % (version, infooffset, infolen))
indexoffset = 0
indexlen = 0
fs.seek(infooffset)
ms = StringIO(fs.read(infolen))
ion_ = ion.BinaryIonParser(ms)
ion_.forceimport(krf.getkrftable())
ion_.next()
ion_.stepin()
while ion_.hasnext():
ion_.next()
if ion_.getfieldname() == "bcIndexTabOffset":
indexoffset = ion_.intvalue()
elif ion_.getfieldname() == "bcIndexTabLength":
indexlen = ion_.intvalue()
ion_.stepout()
lines = []
ion_.print_(lines)
for line in lines:
print(line)
assert indexoffset > 0 and indexlen > 0, "Didn't find index offset or length"
fs.seek(indexoffset)
while fs.tell() < indexoffset + indexlen:
ent = Entity(*struct.unpack("<iiqq", fs.read(24)))
print("Entity {id=%d, type=%s, offset=%d, length=%d}" % (ent.id,
krf.KRF_TABLE[ent.typeindex - ion.SID_ION_1_0_MAX], ent.offset, ent.length))
oldpos = fs.tell()
fs.seek(ent.offset + headerlen)
ms = StringIO(fs.read(ent.length))
ms.seek(10)
lookupid[ent.id] = ms
fs.seek(oldpos)
def selectentry(eid):
parser = ion.BinaryIonParser(lookupid[eid])
parser.forceimport(krf.getkrftable())
lines = []
parser.print_(lines)
for line in lines:
print(line)
# Books are in Documents\My Kindle Content
# .res and .md files seem to be CONT files
# .azw is DRMION
# .voucher is voucher
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment