Skip to content

Instantly share code, notes, and snippets.

@celskeggs
Created February 25, 2018 02:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save celskeggs/3ff1a9eb1bba4042c706859d6ee0b2e2 to your computer and use it in GitHub Desktop.
Save celskeggs/3ff1a9eb1bba4042c706859d6ee0b2e2 to your computer and use it in GitHub Desktop.
Parser and chopper for krb5 ccaches
def u16(data, index):
assert len(data) >= 2
return int.from_bytes(data[index:index+2], "big")
def u32(data, index):
assert len(data) >= 4
return int.from_bytes(data[index:index+4], "big")
def parse_header(header):
fields = []
while header:
tag = u16(header, 0)
length = u16(header, 2)
field, header = header[4:4 + length], header[4 + length:]
fields.append((tag, field))
return fields
def parse_lengthed(data):
length = u32(data, 0)
assert len(data) >= length + 4
return data[4:4+length], data[4+length:]
def parse_princ(data):
name_type = u32(data, 0)
component_count = u32(data, 4)
realm, data = parse_lengthed(data[8:])
components = []
for i in range(component_count):
comp, data = parse_lengthed(data)
components.append(comp)
return (name_type, realm, components), data
def parse_cred(data):
client, data = parse_princ(data)
server, data = parse_princ(data)
keyblock_enctype = u16(data, 0)
keyblock_data, data = parse_lengthed(data[2:])
keyblock = (keyblock_enctype, keyblock_data)
authtime = u32(data, 0)
starttime = u32(data, 4)
endtime = u32(data, 8)
renew_till = u32(data, 12)
assert data[16] in (0, 1)
is_skey = data[16] != 0
ticket_flags = u32(data, 17)
address_count = u32(data, 21)
data = data[25:]
addresses = []
for i in range(address_count):
addrtype = u16(data, 0)
addr, data = parse_lengthed(data[2:])
addresses.append((addrtype, addr))
authdata_count = u32(data, 0)
data = data[4:]
authdata = []
for i in range(authdata_count):
adtype = u16(data, 0)
authdatum, data = parse_lengthed(data[2:])
authdata.append((adtype, authdatum))
ticket, data = parse_lengthed(data)
second_ticket, data = parse_lengthed(data)
return (client, server, keyblock, (authtime, starttime, endtime, renew_till), is_skey, ticket_flags, addresses, authdata, ticket, second_ticket), data
def load_ccache(data):
orig = data
assert type(data) == bytes
assert data[0] == 5, "invalid kerberos ccache"
assert data[1] == 4, "unexpected kerberos ccache version: %d" % data[1]
headerlen = u16(data, 2)
header = data[4:4 + headerlen]
data = data[4 + headerlen:]
header = parse_header(header)
default_principal, data = parse_princ(data)
credentials = []
while data:
cred, data = parse_cred(data)
credentials.append(cred)
loaded = (header, default_principal, credentials)
assert save_ccache(*loaded) == orig
return loaded
def enc16(data):
return data.to_bytes(2, "big")
def enc32(data):
return data.to_bytes(4, "big")
def encode_header(fields):
out = b""
for tag, field in fields:
out += enc16(tag)
out += enc16(len(field))
out += field
return out
def encode_lengthed(data):
return enc32(len(data)) + data
def encode_princ(princ):
name_type, realm, components = princ
return enc32(name_type) + enc32(len(components)) + encode_lengthed(realm) + b"".join(encode_lengthed(comp) for comp in components)
def encode_cred(cred):
(client, server, keyblock, (authtime, starttime, endtime, renew_till), is_skey, ticket_flags, addresses, authdata, ticket, second_ticket) = cred
data = encode_princ(client) + encode_princ(server)
data += enc16(keyblock[0]) + encode_lengthed(keyblock[1])
data += enc32(authtime) + enc32(starttime) + enc32(endtime) + enc32(renew_till)
data += b"\1" if is_skey else b"\0"
data += enc32(ticket_flags)
data += enc32(len(addresses))
data += b"".join(enc16(addrtype) + encode_lengthed(addr) for addrtype, addr in addresses)
data += enc32(len(authdata))
data += b"".join(enc16(adtype) + encode_lengthed(authdatum) for adtype, authdatum in authdata)
data += encode_lengthed(ticket)
data += encode_lengthed(second_ticket)
return data
def save_ccache(header, default_principal, credentials):
header = encode_header(header)
header = b"\5\4" + enc16(len(header)) + header
return header + encode_princ(default_principal) + b"".join(encode_cred(cred) for cred in credentials)
def stringify_princ(princ):
_, realm, components = princ
return (b"/".join(components) + b"@" + realm).decode()
def chop_ccache(ccache):
header, default_principal, credentials = load_ccache(ccache)
assert credentials, "no credentials?"
results = {}
for credential in credentials:
key = stringify_princ(credential[1])
assert key not in results, "duplicate credentials?"
results[key] = save_ccache(header, default_principal, [credential])
return results
if __name__ == "__main__":
with open("/tmp/krb5cc_1000", "rb") as f:
ccache = f.read()
print("chunking into current directory")
for name, derived_ccache in chop_ccache(ccache).items():
if name.endswith("@X-CACHECONF:"):
continue
print("writing", name)
with open("chop-" + name.replace("/", "_") + ".ccache", "wb") as f:
f.write(derived_ccache)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment