Skip to content

Instantly share code, notes, and snippets.

@rxwx

rxwx/epa-test.py Secret

Created November 9, 2022 18:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rxwx/8476acf1f2ca8727c8d356ea26291354 to your computer and use it in GitHub Desktop.
Save rxwx/8476acf1f2ca8727c8d356ea26291354 to your computer and use it in GitHub Desktop.
Dump CBT Data in Python
#!/usr/bin/env python3
from requests_ntlm import HttpNtlmAuth
from ntlm_auth import ntlm
from ntlm_auth.compute_response import ComputeResponse
from ntlm_auth.constants import AvId, AvFlags, MessageTypes, NegotiateFlags, \
NTLM_SIGNATURE
from ntlm_auth.messages import AuthenticateMessage, ChallengeMessage, \
NegotiateMessage, get_version, get_random_export_session_key
from ntlm_auth.session_security import SessionSecurity
from ntlm_auth.rc4 import ARC4
from ntlm_auth.gss_channel_bindings import GssChannelBindingsStruct
import ntlm_auth.compute_keys as compkeys
import requests
import base64
import urllib3
import struct
import hashlib
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class ComputeResponseHook(ComputeResponse):
def get_nt_challenge_response(self, lm_challenge_response,
server_certificate_hash=None, cbt_data=None):
"""
[MS-NLMP] v28.0 2016-07-14
3.3.1 - NTLM v1 Authentication
3.3.2 - NTLM v2 Authentication
This method returns the NtChallengeResponse key based on the
ntlm_compatibility chosen and the target_info supplied by the
CHALLENGE_MESSAGE. It is quite different from what is set in the
document as it combines the NTLMv1, NTLM2 and NTLMv2 methods into one
and calls separate methods based on the ntlm_compatibility value
chosen.
:param lm_challenge_response: The LmChallengeResponse calculated
beforehand, used to get the key_exchange_key value
:param server_certificate_hash: This is deprecated and will be removed
in a future version, use cbt_data instead
:param cbt_data: The GssChannelBindingsStruct to bind in the NTLM
response
:return response: (NtChallengeResponse) - The NT response to the server
challenge. Computed by the client
:return session_base_key: (SessionBaseKey) - A session key calculated
from the user password challenge
:return target_info: (AV_PAIR) - The AV_PAIR structure used in the
nt_challenge calculations
"""
if self._negotiate_flags & \
NegotiateFlags.NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY and \
self._ntlm_compatibility < 3:
# The compatibility level is less than 3 which means it doesn't
# support NTLMv2 but we want extended security so use NTLM2 which
# is different from NTLMv2
# [MS-NLMP] - 3.3.1 NTLMv1 Authentication
response, session_base_key = \
self._get_NTLM2_response(self._password,
self._server_challenge,
self._client_challenge)
lm_hash = comphash._lmowfv1(self._password)
key_exchange_key = \
compkeys._get_exchange_key_ntlm_v1(self._negotiate_flags,
session_base_key,
self._server_challenge,
lm_challenge_response,
lm_hash)
target_info = None
elif 0 <= self._ntlm_compatibility < 3:
response, session_base_key = \
self._get_NTLMv1_response(self._password,
self._server_challenge)
lm_hash = comphash._lmowfv1(self._password)
key_exchange_key = \
compkeys._get_exchange_key_ntlm_v1(self._negotiate_flags,
session_base_key,
self._server_challenge,
lm_challenge_response,
lm_hash)
target_info = None
else:
if self._server_target_info is None:
target_info = ntlm_auth.messages.TargetInfo()
else:
target_info = self._server_target_info
if target_info[AvId.MSV_AV_TIMESTAMP] is None:
timestamp = get_windows_timestamp()
else:
timestamp = target_info[AvId.MSV_AV_TIMESTAMP]
# [MS-NLMP] If the CHALLENGE_MESSAGE TargetInfo field has an
# MsvAvTimestamp present, the client SHOULD provide a MIC
target_info[AvId.MSV_AV_FLAGS] = \
struct.pack("<L", AvFlags.MIC_PROVIDED)
if server_certificate_hash is not None and cbt_data is None:
# Older method of creating CBT struct based on the cert hash.
# This should be avoided in favour of an explicit
# GssChannelBindingStruct being passed in.
certificate_digest = base64.b16decode(server_certificate_hash)
cbt_data = GssChannelBindingsStruct()
cbt_data[cbt_data.APPLICATION_DATA] = \
b'tls-server-end-point:' + certificate_digest
if cbt_data is not None:
cbt_bytes = cbt_data.get_data()
print (f"[*] CBT Data (b'\\x00' * 14 + b'tls-server-end-point:' + SHA256(server_cert)): {cbt_bytes.hex().upper()}")
cbt_hash = hashlib.md5(cbt_bytes).digest()
target_info[AvId.MSV_AV_CHANNEL_BINDINGS] = cbt_hash
print (f'[*] CBT Hash (MD5(CBT_DATA)): {cbt_hash.hex().upper()}')
response, session_base_key = \
self._get_NTLMv2_response(self._user_name, self._password,
self._domain_name,
self._server_challenge,
self._client_challenge,
timestamp, target_info)
key_exchange_key = \
compkeys._get_exchange_key_ntlm_v2(session_base_key)
return response, key_exchange_key, target_info
class AuthenticateMessageHook(AuthenticateMessage):
def __init__(self, user_name, password, domain_name, workstation,
challenge_message, ntlm_compatibility,
server_certificate_hash=None, cbt_data=None):
"""
[MS-NLMP] v28.0 2016-07-14
2.2.1.3 AUTHENTICATE_MESSAGE
The AUTHENTICATE_MESSAGE defines an NTLM authenticate message that is
sent from the client to the server after the CHALLENGE_MESSAGE is
processed by the client.
:param user_name: The user name of the user we are trying to
authenticate with
:param password: The password of the user we are trying to authenticate
with
:param domain_name: The domain name of the user account we are
authenticated with, default is None
:param workstation: The workstation we are using to authenticate with,
default is None
:param challenge_message: A ChallengeMessage object that was received
from the server after the negotiate_message
:param ntlm_compatibility: The Lan Manager Compatibility Level, used to
determine what NTLM auth version to use, see Ntlm in ntlm.py for
more details
:param server_certificate_hash: Deprecated, used cbt_data instead
:param cbt_data: The GssChannelBindingsStruct that contains the CBT
data to bind in the auth response
Message Attributes (Attributes used to compute the message structure):
signature: An 8-byte character array that MUST contain the ASCII
string 'NTLMSSP\0'
message_type: A 32-bit unsigned integer that indicates the message
type. This field must be set to 0x00000003
negotiate_flags: A NEGOTIATE strucutre that contains a set of bit
flags. These flags are the choices the client has made from the
CHALLENGE_MESSAGE options
version: Contains the windows version info of the client. It is
used only debugging purposes and are only set when
NTLMSSP_NEGOTIATE_VERSION flag is set
mic: The message integrity for the NEGOTIATE_MESSAGE,
CHALLENGE_MESSAGE and AUTHENTICATE_MESSAGE
lm_challenge_response: An LM_RESPONSE of LMv2_RESPONSE structure
that contains the computed LM response to the challenge
nt_challenge_response: An NTLM_RESPONSE or NTLMv2_RESPONSE
structure that contains the computed NT response to the
challenge
domain_name: The domain or computer name hosting the user account,
MUST be encoded in the negotiated character set
user_name: The name of the user to be authenticated, MUST be
encoded in the negotiated character set
workstation: The name of the computer to which the user is logged
on, MUST Be encoded in the negotiated character set
encrypted_random_session_key: The client's encrypted random session
key
Non-Message Attributes (Attributes not used in auth message):
exported_session_key: A randomly generated session key based on
other keys, used to derive the SIGNKEY and SEALKEY
target_info: The AV_PAIR structure used in the nt response
calculation
"""
self.signature = NTLM_SIGNATURE
self.message_type = struct.pack('<L', MessageTypes.NTLM_AUTHENTICATE)
self.negotiate_flags = challenge_message.negotiate_flags
self.version = get_version(self.negotiate_flags)
self.mic = None
if domain_name is None:
self.domain_name = ''
else:
self.domain_name = domain_name
if workstation is None:
self.workstation = ''
else:
self.workstation = workstation
if self.negotiate_flags & NegotiateFlags.NTLMSSP_NEGOTIATE_UNICODE:
self.negotiate_flags &= ~NegotiateFlags.NTLMSSP_NEGOTIATE_OEM
encoding_value = 'utf-16-le'
else:
encoding_value = 'ascii'
self.domain_name = self.domain_name.encode(encoding_value)
self.user_name = user_name.encode(encoding_value)
self.workstation = self.workstation.encode(encoding_value)
compute_response = ComputeResponseHook(user_name, password, domain_name,
challenge_message,
ntlm_compatibility)
self.lm_challenge_response = \
compute_response.get_lm_challenge_response()
self.nt_challenge_response, key_exchange_key, target_info = \
compute_response.get_nt_challenge_response(
self.lm_challenge_response, server_certificate_hash, cbt_data)
self.target_info = target_info
if self.negotiate_flags & NegotiateFlags.NTLMSSP_NEGOTIATE_KEY_EXCH:
self.exported_session_key = get_random_export_session_key()
rc4_handle = ARC4(key_exchange_key)
self.encrypted_random_session_key = \
rc4_handle.update(self.exported_session_key)
else:
self.exported_session_key = key_exchange_key
self.encrypted_random_session_key = b''
self.negotiate_flags = struct.pack('<I', self.negotiate_flags)
class NtlmContextHook(ntlm.NtlmContext):
def __init__(self, username, password, domain=None, workstation=None,
cbt_data=None, ntlm_compatibility=3):
r"""
Initialises a NTLM context to use when authenticating using the NTLM
protocol.
Initialises the NTLM context to use when sending and receiving messages
to and from the server. You should be using this object as it supports
NTLMv2 authenticate and it easier to use than before. It also brings in
the ability to use signing and sealing with session_security and
generate a MIC structure.
:param username: The username to authenticate with
:param password: The password for the username
:param domain: The domain part of the username (None if n/a)
:param workstation: The localworkstation (None if n/a)
:param cbt_data: A GssChannelBindingsStruct or None to bind channel
data with the auth process
:param ntlm_compatibility: (Default 3)
The Lan Manager Compatibility Level to use with the auth message
This is set by an Administrator in the registry key
'HKLM\SYSTEM\CurrentControlSet\Control\Lsa\LmCompatibilityLevel'
The values correspond to the following;
0 : LM and NTLMv1
1 : LM, NTLMv1 and NTLMv1 with Extended Session Security
2 : NTLMv1 and NTLMv1 with Extended Session Security
3-5 : NTLMv2 Only
Note: Values 3 to 5 are no different from a client perspective
"""
self.username = username
self.password = password
self.domain = domain
self.workstation = workstation
self.cbt_data = cbt_data
self._server_certificate_hash = None # deprecated for backwards compat
self.ntlm_compatibility = ntlm_compatibility
self.complete = False
# Setting up our flags so the challenge message returns the target info
# block if supported
self.negotiate_flags = NegotiateFlags.NTLMSSP_NEGOTIATE_TARGET_INFO | \
NegotiateFlags.NTLMSSP_NEGOTIATE_128 | \
NegotiateFlags.NTLMSSP_NEGOTIATE_56 | \
NegotiateFlags.NTLMSSP_NEGOTIATE_UNICODE | \
NegotiateFlags.NTLMSSP_NEGOTIATE_VERSION | \
NegotiateFlags.NTLMSSP_NEGOTIATE_KEY_EXCH | \
NegotiateFlags.NTLMSSP_NEGOTIATE_ALWAYS_SIGN | \
NegotiateFlags.NTLMSSP_NEGOTIATE_SIGN | \
NegotiateFlags.NTLMSSP_NEGOTIATE_SEAL
# Setting the message types based on the ntlm_compatibility level
self._set_ntlm_compatibility_flags(self.ntlm_compatibility)
self._negotiate_message = None
self._challenge_message = None
self._authenticate_message = None
self._session_security = None
def step(self, input_token=None):
if self._negotiate_message is None:
self._negotiate_message = NegotiateMessage(self.negotiate_flags,
self.domain,
self.workstation)
return self._negotiate_message.get_data()
else:
self._challenge_message = ChallengeMessage(input_token)
self._authenticate_message = AuthenticateMessageHook(
self.username, self.password, self.domain, self.workstation,
self._challenge_message, self.ntlm_compatibility,
server_certificate_hash=self._server_certificate_hash,
cbt_data=self.cbt_data
)
self._authenticate_message.add_mic(self._negotiate_message,
self._challenge_message)
flag_bytes = self._authenticate_message.negotiate_flags
flags = struct.unpack("<I", flag_bytes)[0]
if flags & NegotiateFlags.NTLMSSP_NEGOTIATE_SEAL or \
flags & NegotiateFlags.NTLMSSP_NEGOTIATE_SIGN:
self._session_security = SessionSecurity(
flags, self.session_key
)
self.complete = True
return self._authenticate_message.get_data()
class NtlmHook(ntlm.Ntlm):
def __init__(self, ntlm_compatibility=3):
self._context = NtlmContextHook(None, None,
ntlm_compatibility=ntlm_compatibility)
self._challenge_token = None
def create_negotiate_message(self, domain_name=None, workstation=None):
self._context.domain = domain_name
self._context.workstation = workstation
msg = self._context.step()
msg = base64.b64encode(msg)
print (f'[*] Negotiate: {msg.decode()}')
return msg
def create_authenticate_message(self, user_name, password,
domain_name=None, workstation=None,
server_certificate_hash=None):
self._context.username = user_name
self._context.password = password
self._context.domain = domain_name
self._context.workstation = workstation
self._context._server_certificate_hash = server_certificate_hash
msg = self._context.step(self._challenge_token)
return base64.b64encode(msg)
class HttpNtlmAuthHook(HttpNtlmAuth):
def retry_using_http_NTLM_auth(self, auth_header_field, auth_header,
response, auth_type, args):
# Get the certificate of the server if using HTTPS for CBT
server_certificate_hash = self._get_server_cert(response)
"""Attempt to authenticate using HTTP NTLM challenge/response."""
if auth_header in response.request.headers:
return response
content_length = int(
response.request.headers.get('Content-Length', '0'), base=10)
if hasattr(response.request.body, 'seek'):
if content_length > 0:
response.request.body.seek(-content_length, 1)
else:
response.request.body.seek(0, 0)
# Consume content and release the original connection
# to allow our new request to reuse the same one.
response.content
response.raw.release_conn()
request = response.request.copy()
# ntlm returns the headers as a base64 encoded bytestring. Convert to
# a string.
context = NtlmHook()
negotiate_message = context.create_negotiate_message(self.domain).decode('ascii')
auth = u'%s %s' % (auth_type, negotiate_message)
request.headers[auth_header] = auth
# A streaming response breaks authentication.
# This can be fixed by not streaming this request, which is safe
# because the returned response3 will still have stream=True set if
# specified in args. In addition, we expect this request to give us a
# challenge and not the real content, so the content will be short
# anyway.
args_nostream = dict(args, stream=False)
response2 = response.connection.send(request, **args_nostream)
# needed to make NTLM auth compatible with requests-2.3.0
# Consume content and release the original connection
# to allow our new request to reuse the same one.
response2.content
response2.raw.release_conn()
request = response2.request.copy()
# this is important for some web applications that store
# authentication-related info in cookies (it took a long time to
# figure out)
if response2.headers.get('set-cookie'):
request.headers['Cookie'] = response2.headers.get('set-cookie')
# get the challenge
auth_header_value = response2.headers[auth_header_field]
auth_strip = auth_type + ' '
ntlm_header_value = next(
s for s in (val.lstrip() for val in auth_header_value.split(','))
if s.startswith(auth_strip)
).strip()
# Parse the challenge in the ntlm context
context.parse_challenge_message(ntlm_header_value[len(auth_strip):])
# build response
# Get the response based on the challenge message
authenticate_message = context.create_authenticate_message(
self.username,
self.password,
self.domain,
server_certificate_hash=server_certificate_hash
)
authenticate_message = authenticate_message.decode('ascii')
print (f'[*] Authenticate message: {authenticate_message}')
auth = u'%s %s' % (auth_type, authenticate_message)
request.headers[auth_header] = auth
response3 = response2.connection.send(request, **args)
# Update the history.
response3.history.append(response)
response3.history.append(response2)
# Get the session_security object created by ntlm-auth for signing and sealing of messages
self.session_security = context.session_security
return response3
def _get_server_cert(self, response):
server_cert = super(HttpNtlmAuthHook, self)._get_server_cert(response)
print (f'[*] Server cert (SHA256(server_cert)): {server_cert}')
return server_cert
def do_request(send_cbt):
return requests.get('https://exchange1.corp.local/EWS/Exchange.asmx',
auth=HttpNtlmAuthHook('DOMAIN\\EXCHANGE2$','aad3b435b51404eeaad3b435b51404ee:199daa51d64425a7f35281d8db945ae8', send_cbt=send_cbt),
verify=False,
)
print (f'[*] Status with CBT: {do_request(True).status_code}\n')
print (f'[*] Status without CBT: {do_request(False).status_code}\n')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment