Dump CBT Data in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from requests_ntlm import HttpNtlmAuth | |
from ntlm_auth import ntlm | |
from ntlm_auth.compute_response import ComputeResponse | |
from ntlm_auth.constants import AvId, AvFlags, MessageTypes, NegotiateFlags, \ | |
NTLM_SIGNATURE | |
from ntlm_auth.messages import AuthenticateMessage, ChallengeMessage, \ | |
NegotiateMessage, get_version, get_random_export_session_key | |
from ntlm_auth.session_security import SessionSecurity | |
from ntlm_auth.rc4 import ARC4 | |
from ntlm_auth.gss_channel_bindings import GssChannelBindingsStruct | |
import ntlm_auth.compute_keys as compkeys | |
import requests | |
import base64 | |
import urllib3 | |
import struct | |
import hashlib | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
class ComputeResponseHook(ComputeResponse): | |
def get_nt_challenge_response(self, lm_challenge_response, | |
server_certificate_hash=None, cbt_data=None): | |
""" | |
[MS-NLMP] v28.0 2016-07-14 | |
3.3.1 - NTLM v1 Authentication | |
3.3.2 - NTLM v2 Authentication | |
This method returns the NtChallengeResponse key based on the | |
ntlm_compatibility chosen and the target_info supplied by the | |
CHALLENGE_MESSAGE. It is quite different from what is set in the | |
document as it combines the NTLMv1, NTLM2 and NTLMv2 methods into one | |
and calls separate methods based on the ntlm_compatibility value | |
chosen. | |
:param lm_challenge_response: The LmChallengeResponse calculated | |
beforehand, used to get the key_exchange_key value | |
:param server_certificate_hash: This is deprecated and will be removed | |
in a future version, use cbt_data instead | |
:param cbt_data: The GssChannelBindingsStruct to bind in the NTLM | |
response | |
:return response: (NtChallengeResponse) - The NT response to the server | |
challenge. Computed by the client | |
:return session_base_key: (SessionBaseKey) - A session key calculated | |
from the user password challenge | |
:return target_info: (AV_PAIR) - The AV_PAIR structure used in the | |
nt_challenge calculations | |
""" | |
if self._negotiate_flags & \ | |
NegotiateFlags.NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY and \ | |
self._ntlm_compatibility < 3: | |
# The compatibility level is less than 3 which means it doesn't | |
# support NTLMv2 but we want extended security so use NTLM2 which | |
# is different from NTLMv2 | |
# [MS-NLMP] - 3.3.1 NTLMv1 Authentication | |
response, session_base_key = \ | |
self._get_NTLM2_response(self._password, | |
self._server_challenge, | |
self._client_challenge) | |
lm_hash = comphash._lmowfv1(self._password) | |
key_exchange_key = \ | |
compkeys._get_exchange_key_ntlm_v1(self._negotiate_flags, | |
session_base_key, | |
self._server_challenge, | |
lm_challenge_response, | |
lm_hash) | |
target_info = None | |
elif 0 <= self._ntlm_compatibility < 3: | |
response, session_base_key = \ | |
self._get_NTLMv1_response(self._password, | |
self._server_challenge) | |
lm_hash = comphash._lmowfv1(self._password) | |
key_exchange_key = \ | |
compkeys._get_exchange_key_ntlm_v1(self._negotiate_flags, | |
session_base_key, | |
self._server_challenge, | |
lm_challenge_response, | |
lm_hash) | |
target_info = None | |
else: | |
if self._server_target_info is None: | |
target_info = ntlm_auth.messages.TargetInfo() | |
else: | |
target_info = self._server_target_info | |
if target_info[AvId.MSV_AV_TIMESTAMP] is None: | |
timestamp = get_windows_timestamp() | |
else: | |
timestamp = target_info[AvId.MSV_AV_TIMESTAMP] | |
# [MS-NLMP] If the CHALLENGE_MESSAGE TargetInfo field has an | |
# MsvAvTimestamp present, the client SHOULD provide a MIC | |
target_info[AvId.MSV_AV_FLAGS] = \ | |
struct.pack("<L", AvFlags.MIC_PROVIDED) | |
if server_certificate_hash is not None and cbt_data is None: | |
# Older method of creating CBT struct based on the cert hash. | |
# This should be avoided in favour of an explicit | |
# GssChannelBindingStruct being passed in. | |
certificate_digest = base64.b16decode(server_certificate_hash) | |
cbt_data = GssChannelBindingsStruct() | |
cbt_data[cbt_data.APPLICATION_DATA] = \ | |
b'tls-server-end-point:' + certificate_digest | |
if cbt_data is not None: | |
cbt_bytes = cbt_data.get_data() | |
print (f"[*] CBT Data (b'\\x00' * 14 + b'tls-server-end-point:' + SHA256(server_cert)): {cbt_bytes.hex().upper()}") | |
cbt_hash = hashlib.md5(cbt_bytes).digest() | |
target_info[AvId.MSV_AV_CHANNEL_BINDINGS] = cbt_hash | |
print (f'[*] CBT Hash (MD5(CBT_DATA)): {cbt_hash.hex().upper()}') | |
response, session_base_key = \ | |
self._get_NTLMv2_response(self._user_name, self._password, | |
self._domain_name, | |
self._server_challenge, | |
self._client_challenge, | |
timestamp, target_info) | |
key_exchange_key = \ | |
compkeys._get_exchange_key_ntlm_v2(session_base_key) | |
return response, key_exchange_key, target_info | |
class AuthenticateMessageHook(AuthenticateMessage): | |
def __init__(self, user_name, password, domain_name, workstation, | |
challenge_message, ntlm_compatibility, | |
server_certificate_hash=None, cbt_data=None): | |
""" | |
[MS-NLMP] v28.0 2016-07-14 | |
2.2.1.3 AUTHENTICATE_MESSAGE | |
The AUTHENTICATE_MESSAGE defines an NTLM authenticate message that is | |
sent from the client to the server after the CHALLENGE_MESSAGE is | |
processed by the client. | |
:param user_name: The user name of the user we are trying to | |
authenticate with | |
:param password: The password of the user we are trying to authenticate | |
with | |
:param domain_name: The domain name of the user account we are | |
authenticated with, default is None | |
:param workstation: The workstation we are using to authenticate with, | |
default is None | |
:param challenge_message: A ChallengeMessage object that was received | |
from the server after the negotiate_message | |
:param ntlm_compatibility: The Lan Manager Compatibility Level, used to | |
determine what NTLM auth version to use, see Ntlm in ntlm.py for | |
more details | |
:param server_certificate_hash: Deprecated, used cbt_data instead | |
:param cbt_data: The GssChannelBindingsStruct that contains the CBT | |
data to bind in the auth response | |
Message Attributes (Attributes used to compute the message structure): | |
signature: An 8-byte character array that MUST contain the ASCII | |
string 'NTLMSSP\0' | |
message_type: A 32-bit unsigned integer that indicates the message | |
type. This field must be set to 0x00000003 | |
negotiate_flags: A NEGOTIATE strucutre that contains a set of bit | |
flags. These flags are the choices the client has made from the | |
CHALLENGE_MESSAGE options | |
version: Contains the windows version info of the client. It is | |
used only debugging purposes and are only set when | |
NTLMSSP_NEGOTIATE_VERSION flag is set | |
mic: The message integrity for the NEGOTIATE_MESSAGE, | |
CHALLENGE_MESSAGE and AUTHENTICATE_MESSAGE | |
lm_challenge_response: An LM_RESPONSE of LMv2_RESPONSE structure | |
that contains the computed LM response to the challenge | |
nt_challenge_response: An NTLM_RESPONSE or NTLMv2_RESPONSE | |
structure that contains the computed NT response to the | |
challenge | |
domain_name: The domain or computer name hosting the user account, | |
MUST be encoded in the negotiated character set | |
user_name: The name of the user to be authenticated, MUST be | |
encoded in the negotiated character set | |
workstation: The name of the computer to which the user is logged | |
on, MUST Be encoded in the negotiated character set | |
encrypted_random_session_key: The client's encrypted random session | |
key | |
Non-Message Attributes (Attributes not used in auth message): | |
exported_session_key: A randomly generated session key based on | |
other keys, used to derive the SIGNKEY and SEALKEY | |
target_info: The AV_PAIR structure used in the nt response | |
calculation | |
""" | |
self.signature = NTLM_SIGNATURE | |
self.message_type = struct.pack('<L', MessageTypes.NTLM_AUTHENTICATE) | |
self.negotiate_flags = challenge_message.negotiate_flags | |
self.version = get_version(self.negotiate_flags) | |
self.mic = None | |
if domain_name is None: | |
self.domain_name = '' | |
else: | |
self.domain_name = domain_name | |
if workstation is None: | |
self.workstation = '' | |
else: | |
self.workstation = workstation | |
if self.negotiate_flags & NegotiateFlags.NTLMSSP_NEGOTIATE_UNICODE: | |
self.negotiate_flags &= ~NegotiateFlags.NTLMSSP_NEGOTIATE_OEM | |
encoding_value = 'utf-16-le' | |
else: | |
encoding_value = 'ascii' | |
self.domain_name = self.domain_name.encode(encoding_value) | |
self.user_name = user_name.encode(encoding_value) | |
self.workstation = self.workstation.encode(encoding_value) | |
compute_response = ComputeResponseHook(user_name, password, domain_name, | |
challenge_message, | |
ntlm_compatibility) | |
self.lm_challenge_response = \ | |
compute_response.get_lm_challenge_response() | |
self.nt_challenge_response, key_exchange_key, target_info = \ | |
compute_response.get_nt_challenge_response( | |
self.lm_challenge_response, server_certificate_hash, cbt_data) | |
self.target_info = target_info | |
if self.negotiate_flags & NegotiateFlags.NTLMSSP_NEGOTIATE_KEY_EXCH: | |
self.exported_session_key = get_random_export_session_key() | |
rc4_handle = ARC4(key_exchange_key) | |
self.encrypted_random_session_key = \ | |
rc4_handle.update(self.exported_session_key) | |
else: | |
self.exported_session_key = key_exchange_key | |
self.encrypted_random_session_key = b'' | |
self.negotiate_flags = struct.pack('<I', self.negotiate_flags) | |
class NtlmContextHook(ntlm.NtlmContext): | |
def __init__(self, username, password, domain=None, workstation=None, | |
cbt_data=None, ntlm_compatibility=3): | |
r""" | |
Initialises a NTLM context to use when authenticating using the NTLM | |
protocol. | |
Initialises the NTLM context to use when sending and receiving messages | |
to and from the server. You should be using this object as it supports | |
NTLMv2 authenticate and it easier to use than before. It also brings in | |
the ability to use signing and sealing with session_security and | |
generate a MIC structure. | |
:param username: The username to authenticate with | |
:param password: The password for the username | |
:param domain: The domain part of the username (None if n/a) | |
:param workstation: The localworkstation (None if n/a) | |
:param cbt_data: A GssChannelBindingsStruct or None to bind channel | |
data with the auth process | |
:param ntlm_compatibility: (Default 3) | |
The Lan Manager Compatibility Level to use with the auth message | |
This is set by an Administrator in the registry key | |
'HKLM\SYSTEM\CurrentControlSet\Control\Lsa\LmCompatibilityLevel' | |
The values correspond to the following; | |
0 : LM and NTLMv1 | |
1 : LM, NTLMv1 and NTLMv1 with Extended Session Security | |
2 : NTLMv1 and NTLMv1 with Extended Session Security | |
3-5 : NTLMv2 Only | |
Note: Values 3 to 5 are no different from a client perspective | |
""" | |
self.username = username | |
self.password = password | |
self.domain = domain | |
self.workstation = workstation | |
self.cbt_data = cbt_data | |
self._server_certificate_hash = None # deprecated for backwards compat | |
self.ntlm_compatibility = ntlm_compatibility | |
self.complete = False | |
# Setting up our flags so the challenge message returns the target info | |
# block if supported | |
self.negotiate_flags = NegotiateFlags.NTLMSSP_NEGOTIATE_TARGET_INFO | \ | |
NegotiateFlags.NTLMSSP_NEGOTIATE_128 | \ | |
NegotiateFlags.NTLMSSP_NEGOTIATE_56 | \ | |
NegotiateFlags.NTLMSSP_NEGOTIATE_UNICODE | \ | |
NegotiateFlags.NTLMSSP_NEGOTIATE_VERSION | \ | |
NegotiateFlags.NTLMSSP_NEGOTIATE_KEY_EXCH | \ | |
NegotiateFlags.NTLMSSP_NEGOTIATE_ALWAYS_SIGN | \ | |
NegotiateFlags.NTLMSSP_NEGOTIATE_SIGN | \ | |
NegotiateFlags.NTLMSSP_NEGOTIATE_SEAL | |
# Setting the message types based on the ntlm_compatibility level | |
self._set_ntlm_compatibility_flags(self.ntlm_compatibility) | |
self._negotiate_message = None | |
self._challenge_message = None | |
self._authenticate_message = None | |
self._session_security = None | |
def step(self, input_token=None): | |
if self._negotiate_message is None: | |
self._negotiate_message = NegotiateMessage(self.negotiate_flags, | |
self.domain, | |
self.workstation) | |
return self._negotiate_message.get_data() | |
else: | |
self._challenge_message = ChallengeMessage(input_token) | |
self._authenticate_message = AuthenticateMessageHook( | |
self.username, self.password, self.domain, self.workstation, | |
self._challenge_message, self.ntlm_compatibility, | |
server_certificate_hash=self._server_certificate_hash, | |
cbt_data=self.cbt_data | |
) | |
self._authenticate_message.add_mic(self._negotiate_message, | |
self._challenge_message) | |
flag_bytes = self._authenticate_message.negotiate_flags | |
flags = struct.unpack("<I", flag_bytes)[0] | |
if flags & NegotiateFlags.NTLMSSP_NEGOTIATE_SEAL or \ | |
flags & NegotiateFlags.NTLMSSP_NEGOTIATE_SIGN: | |
self._session_security = SessionSecurity( | |
flags, self.session_key | |
) | |
self.complete = True | |
return self._authenticate_message.get_data() | |
class NtlmHook(ntlm.Ntlm): | |
def __init__(self, ntlm_compatibility=3): | |
self._context = NtlmContextHook(None, None, | |
ntlm_compatibility=ntlm_compatibility) | |
self._challenge_token = None | |
def create_negotiate_message(self, domain_name=None, workstation=None): | |
self._context.domain = domain_name | |
self._context.workstation = workstation | |
msg = self._context.step() | |
msg = base64.b64encode(msg) | |
print (f'[*] Negotiate: {msg.decode()}') | |
return msg | |
def create_authenticate_message(self, user_name, password, | |
domain_name=None, workstation=None, | |
server_certificate_hash=None): | |
self._context.username = user_name | |
self._context.password = password | |
self._context.domain = domain_name | |
self._context.workstation = workstation | |
self._context._server_certificate_hash = server_certificate_hash | |
msg = self._context.step(self._challenge_token) | |
return base64.b64encode(msg) | |
class HttpNtlmAuthHook(HttpNtlmAuth): | |
def retry_using_http_NTLM_auth(self, auth_header_field, auth_header, | |
response, auth_type, args): | |
# Get the certificate of the server if using HTTPS for CBT | |
server_certificate_hash = self._get_server_cert(response) | |
"""Attempt to authenticate using HTTP NTLM challenge/response.""" | |
if auth_header in response.request.headers: | |
return response | |
content_length = int( | |
response.request.headers.get('Content-Length', '0'), base=10) | |
if hasattr(response.request.body, 'seek'): | |
if content_length > 0: | |
response.request.body.seek(-content_length, 1) | |
else: | |
response.request.body.seek(0, 0) | |
# Consume content and release the original connection | |
# to allow our new request to reuse the same one. | |
response.content | |
response.raw.release_conn() | |
request = response.request.copy() | |
# ntlm returns the headers as a base64 encoded bytestring. Convert to | |
# a string. | |
context = NtlmHook() | |
negotiate_message = context.create_negotiate_message(self.domain).decode('ascii') | |
auth = u'%s %s' % (auth_type, negotiate_message) | |
request.headers[auth_header] = auth | |
# A streaming response breaks authentication. | |
# This can be fixed by not streaming this request, which is safe | |
# because the returned response3 will still have stream=True set if | |
# specified in args. In addition, we expect this request to give us a | |
# challenge and not the real content, so the content will be short | |
# anyway. | |
args_nostream = dict(args, stream=False) | |
response2 = response.connection.send(request, **args_nostream) | |
# needed to make NTLM auth compatible with requests-2.3.0 | |
# Consume content and release the original connection | |
# to allow our new request to reuse the same one. | |
response2.content | |
response2.raw.release_conn() | |
request = response2.request.copy() | |
# this is important for some web applications that store | |
# authentication-related info in cookies (it took a long time to | |
# figure out) | |
if response2.headers.get('set-cookie'): | |
request.headers['Cookie'] = response2.headers.get('set-cookie') | |
# get the challenge | |
auth_header_value = response2.headers[auth_header_field] | |
auth_strip = auth_type + ' ' | |
ntlm_header_value = next( | |
s for s in (val.lstrip() for val in auth_header_value.split(',')) | |
if s.startswith(auth_strip) | |
).strip() | |
# Parse the challenge in the ntlm context | |
context.parse_challenge_message(ntlm_header_value[len(auth_strip):]) | |
# build response | |
# Get the response based on the challenge message | |
authenticate_message = context.create_authenticate_message( | |
self.username, | |
self.password, | |
self.domain, | |
server_certificate_hash=server_certificate_hash | |
) | |
authenticate_message = authenticate_message.decode('ascii') | |
print (f'[*] Authenticate message: {authenticate_message}') | |
auth = u'%s %s' % (auth_type, authenticate_message) | |
request.headers[auth_header] = auth | |
response3 = response2.connection.send(request, **args) | |
# Update the history. | |
response3.history.append(response) | |
response3.history.append(response2) | |
# Get the session_security object created by ntlm-auth for signing and sealing of messages | |
self.session_security = context.session_security | |
return response3 | |
def _get_server_cert(self, response): | |
server_cert = super(HttpNtlmAuthHook, self)._get_server_cert(response) | |
print (f'[*] Server cert (SHA256(server_cert)): {server_cert}') | |
return server_cert | |
def do_request(send_cbt): | |
return requests.get('https://exchange1.corp.local/EWS/Exchange.asmx', | |
auth=HttpNtlmAuthHook('DOMAIN\\EXCHANGE2$','aad3b435b51404eeaad3b435b51404ee:199daa51d64425a7f35281d8db945ae8', send_cbt=send_cbt), | |
verify=False, | |
) | |
print (f'[*] Status with CBT: {do_request(True).status_code}\n') | |
print (f'[*] Status without CBT: {do_request(False).status_code}\n') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment