-
-
Save lnattrass/a4a91dbf439fc1719d69f7865c1b1791 to your computer and use it in GitHub Desktop.
import sys | |
import pprint | |
import struct | |
import socket | |
import ssl | |
from time import sleep | |
# Standard "HELLO" message for TDS | |
prelogin_msg = bytearray([ 0x12, 0x01, 0x00, 0x2f, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x1a, 0x00, 0x06, 0x01, 0x00, 0x20, | |
0x00, 0x01, 0x02, 0x00, 0x21, 0x00, 0x01, 0x03, 0x00, 0x22, 0x00, 0x04, 0x04, 0x00, 0x26, 0x00, | |
0x01, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 ]) | |
# Prep Header function | |
def prep_header(data): | |
data_len = len(data) | |
prelogin_head = bytearray([ 0x12, 0x01 ]) | |
header_len = 8 | |
total_len = header_len + data_len | |
data_head = prelogin_head + total_len.to_bytes(2, 'big') | |
data_head += bytearray([ 0x00, 0x00, 0x01, 0x00]) | |
return data_head + data | |
def read_header(data): | |
if len(data) != 8: | |
raise ValueError("prelogin header is > 8-bytes", data) | |
format = ">bbhhbb" | |
sct = struct.Struct(format) | |
unpacked = sct.unpack(data) | |
return { "type": unpacked[0], | |
"status": unpacked[1], | |
"length": unpacked[2], | |
"channel": unpacked[3], | |
"packet": unpacked[4], | |
"window": unpacked[5] | |
} | |
tdspbuf = bytearray() | |
def recv_tdspacket(sock): | |
global tdspbuf | |
tdspacket = tdspbuf | |
header = {} | |
for i in range(0,5): | |
tdspacket += sock.recv(4096) | |
print("\n# get_tdspacket: {}, tdspacket len: {} ".format(i, len(tdspacket))) | |
if len(tdspacket) >= 8: | |
header = read_header(tdspacket[:8]) | |
print("# Header: ", header) | |
if len(tdspacket) >= header['length']: | |
tdspbuf = tdspacket[header['length']:] | |
print("# Remaining tdspbuf length: {}\n".format(len(tdspbuf))) | |
return header, tdspacket[8:header['length']] | |
sleep(0.05) | |
# Ensure we have a commandline | |
if len(sys.argv) != 3: | |
print("Usage: {} <hostname> <port>".format(sys.argv[0])) | |
sys.exit(1) | |
hostname = sys.argv[1] | |
port = int(sys.argv[2]) | |
# Setup SSL | |
if hasattr(ssl, 'PROTOCOL_TLS'): | |
sslProto = ssl.PROTOCOL_TLS | |
else: | |
sslProto = ssl.PROTOCOL_SSLv23 | |
sslctx = ssl.SSLContext(sslProto) | |
sslctx.check_hostname = False | |
tls_in_buf = ssl.MemoryBIO() | |
tls_out_buf = ssl.MemoryBIO() | |
# Create the SSLObj connected to the tls_in_buf and tls_out_buf | |
tlssock = sslctx.wrap_bio(tls_in_buf, tls_out_buf) | |
# create an INET, STREAMing socket | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.setblocking(0) | |
s.settimeout(1) | |
# Connect to the SQL Server | |
s.connect(( hostname, port )) | |
# Send the first TDS PRELOGIN message | |
s.send(prelogin_msg) | |
# Get the response and ignore. We will try to negotiate encryption anyway. | |
header, data = recv_tdspacket(s) | |
while header['status']==0: | |
header, ext_data = recv_tdspacket(s) | |
data += ext_data | |
print("# Starting TLS handshake loop..") | |
# Craft the packet | |
for i in range(0,5): | |
try: | |
tlssock.do_handshake() | |
print("# Handshake completed, dumping certificates") | |
peercert = ssl.DER_cert_to_PEM_cert(tlssock.getpeercert(True)) | |
print(peercert) | |
sys.exit(0) | |
except ssl.SSLWantReadError as err: | |
# TLS wants to keep shaking hands, but because we're controlling the R/W buffers it throws an exception | |
print("# Shaking ({}/5)".format(i)) | |
tls_data = tls_out_buf.read() | |
s.sendall(prep_header(tls_data)) | |
# TDS Packets can be split over two frames, each with their own headers. | |
# We have to concat these for TLS to handle nego properly | |
header, data = recv_tdspacket(s) | |
while header['status']==0: | |
header, ext_data = recv_tdspacket(s) | |
data += ext_data | |
tls_in_buf.write(data) | |
print("# Handshake did not complete / exiting") |
brandonros
commented
May 26, 2022
•
@brandonros: Sorry for the late reply, but try using python3
.
Thank you ! In my company, we use your script to monitor the health of our MSSQL instances.
This is great!
It would be fairly simple to make openssl s_client
handle this case (e.g. openssl s_client -starttls mssql
or openssl s_client -starttls tds
depending on how you think the TLS handshaking protocol should be named 😄)
See openssl/openssl@a2d9cfb as a past case where the MySQL handshake was added (openssl s_client -starttls mysql
).
Thank you! This worked great for me. I can confirm it works with SQL Server 2019.
Thank you so much! This was just what I needed to troubleshoot some SSL issues with an Azure SQL DB. I did need to bump up the socket timeout as I was getting some intermittent issues like this:
Traceback (most recent call last):
File "/home/sanelson/src/sql_ssl_test/get_tds_cert.py", line 117, in <module>
header, ext_data = recv_tdspacket(s)
File "/home/sanelson/src/sql_ssl_test/get_tds_cert.py", line 45, in recv_tdspacket
tdspacket += sock.recv(4096)
TimeoutError: timed out
I changed s.settimeout(1)
to s.settimeout(5)
and then it worked perfectly.