Created
February 25, 2018 02:40
-
-
Save celskeggs/3ff1a9eb1bba4042c706859d6ee0b2e2 to your computer and use it in GitHub Desktop.
Parser and chopper for krb5 ccaches
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def u16(data, index): | |
assert len(data) >= 2 | |
return int.from_bytes(data[index:index+2], "big") | |
def u32(data, index): | |
assert len(data) >= 4 | |
return int.from_bytes(data[index:index+4], "big") | |
def parse_header(header): | |
fields = [] | |
while header: | |
tag = u16(header, 0) | |
length = u16(header, 2) | |
field, header = header[4:4 + length], header[4 + length:] | |
fields.append((tag, field)) | |
return fields | |
def parse_lengthed(data): | |
length = u32(data, 0) | |
assert len(data) >= length + 4 | |
return data[4:4+length], data[4+length:] | |
def parse_princ(data): | |
name_type = u32(data, 0) | |
component_count = u32(data, 4) | |
realm, data = parse_lengthed(data[8:]) | |
components = [] | |
for i in range(component_count): | |
comp, data = parse_lengthed(data) | |
components.append(comp) | |
return (name_type, realm, components), data | |
def parse_cred(data): | |
client, data = parse_princ(data) | |
server, data = parse_princ(data) | |
keyblock_enctype = u16(data, 0) | |
keyblock_data, data = parse_lengthed(data[2:]) | |
keyblock = (keyblock_enctype, keyblock_data) | |
authtime = u32(data, 0) | |
starttime = u32(data, 4) | |
endtime = u32(data, 8) | |
renew_till = u32(data, 12) | |
assert data[16] in (0, 1) | |
is_skey = data[16] != 0 | |
ticket_flags = u32(data, 17) | |
address_count = u32(data, 21) | |
data = data[25:] | |
addresses = [] | |
for i in range(address_count): | |
addrtype = u16(data, 0) | |
addr, data = parse_lengthed(data[2:]) | |
addresses.append((addrtype, addr)) | |
authdata_count = u32(data, 0) | |
data = data[4:] | |
authdata = [] | |
for i in range(authdata_count): | |
adtype = u16(data, 0) | |
authdatum, data = parse_lengthed(data[2:]) | |
authdata.append((adtype, authdatum)) | |
ticket, data = parse_lengthed(data) | |
second_ticket, data = parse_lengthed(data) | |
return (client, server, keyblock, (authtime, starttime, endtime, renew_till), is_skey, ticket_flags, addresses, authdata, ticket, second_ticket), data | |
def load_ccache(data): | |
orig = data | |
assert type(data) == bytes | |
assert data[0] == 5, "invalid kerberos ccache" | |
assert data[1] == 4, "unexpected kerberos ccache version: %d" % data[1] | |
headerlen = u16(data, 2) | |
header = data[4:4 + headerlen] | |
data = data[4 + headerlen:] | |
header = parse_header(header) | |
default_principal, data = parse_princ(data) | |
credentials = [] | |
while data: | |
cred, data = parse_cred(data) | |
credentials.append(cred) | |
loaded = (header, default_principal, credentials) | |
assert save_ccache(*loaded) == orig | |
return loaded | |
def enc16(data): | |
return data.to_bytes(2, "big") | |
def enc32(data): | |
return data.to_bytes(4, "big") | |
def encode_header(fields): | |
out = b"" | |
for tag, field in fields: | |
out += enc16(tag) | |
out += enc16(len(field)) | |
out += field | |
return out | |
def encode_lengthed(data): | |
return enc32(len(data)) + data | |
def encode_princ(princ): | |
name_type, realm, components = princ | |
return enc32(name_type) + enc32(len(components)) + encode_lengthed(realm) + b"".join(encode_lengthed(comp) for comp in components) | |
def encode_cred(cred): | |
(client, server, keyblock, (authtime, starttime, endtime, renew_till), is_skey, ticket_flags, addresses, authdata, ticket, second_ticket) = cred | |
data = encode_princ(client) + encode_princ(server) | |
data += enc16(keyblock[0]) + encode_lengthed(keyblock[1]) | |
data += enc32(authtime) + enc32(starttime) + enc32(endtime) + enc32(renew_till) | |
data += b"\1" if is_skey else b"\0" | |
data += enc32(ticket_flags) | |
data += enc32(len(addresses)) | |
data += b"".join(enc16(addrtype) + encode_lengthed(addr) for addrtype, addr in addresses) | |
data += enc32(len(authdata)) | |
data += b"".join(enc16(adtype) + encode_lengthed(authdatum) for adtype, authdatum in authdata) | |
data += encode_lengthed(ticket) | |
data += encode_lengthed(second_ticket) | |
return data | |
def save_ccache(header, default_principal, credentials): | |
header = encode_header(header) | |
header = b"\5\4" + enc16(len(header)) + header | |
return header + encode_princ(default_principal) + b"".join(encode_cred(cred) for cred in credentials) | |
def stringify_princ(princ): | |
_, realm, components = princ | |
return (b"/".join(components) + b"@" + realm).decode() | |
def chop_ccache(ccache): | |
header, default_principal, credentials = load_ccache(ccache) | |
assert credentials, "no credentials?" | |
results = {} | |
for credential in credentials: | |
key = stringify_princ(credential[1]) | |
assert key not in results, "duplicate credentials?" | |
results[key] = save_ccache(header, default_principal, [credential]) | |
return results | |
if __name__ == "__main__": | |
with open("/tmp/krb5cc_1000", "rb") as f: | |
ccache = f.read() | |
print("chunking into current directory") | |
for name, derived_ccache in chop_ccache(ccache).items(): | |
if name.endswith("@X-CACHECONF:"): | |
continue | |
print("writing", name) | |
with open("chop-" + name.replace("/", "_") + ".ccache", "wb") as f: | |
f.write(derived_ccache) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment