Skip to content

Instantly share code, notes, and snippets.

@jborean93
Created November 28, 2023 00:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jborean93/4bc4f20bff0ec6a2496eb511d055a8fa to your computer and use it in GitHub Desktop.
Save jborean93/4bc4f20bff0ec6a2496eb511d055a8fa to your computer and use it in GitHub Desktop.
Test WinRM with GSSAPI authentication in Python
from __future__ import annotations
import base64
import gssapi
import io
import re
import requests
import struct
import sys
import typing as t
import uuid
import warnings
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from xml.etree import ElementTree as ET
WSMAN_NS = {
"s": "http://www.w3.org/2003/05/soap-envelope",
"xs": "http://www.w3.org/2001/XMLSchema",
"xsi": "http://www.w3.org/2001/XMLSchema-instance",
"wsa": "http://schemas.xmlsoap.org/ws/2004/08/addressing",
"wsman": "http://schemas.dmtf.org/wbem/wsman/1/wsman.xsd",
"wsmid": "http://schemas.dmtf.org/wbem/wsman/identify/1/wsmanidentity.xsd",
"wsmanfault": "http://schemas.microsoft.com/wbem/wsman/1/wsmanfault",
"cim": "http://schemas.dmtf.org/wbem/wscim/1/common",
"wsmv": "http://schemas.microsoft.com/wbem/wsman/1/wsman.xsd",
"cfg": "http://schemas.microsoft.com/wbem/wsman/1/config",
"sub": "http://schemas.microsoft.com/wbem/wsman/1/subscription",
"rsp": "http://schemas.microsoft.com/wbem/wsman/1/windows/shell",
"m": "http://schemas.microsoft.com/wbem/wsman/1/machineid",
"cert": "http://schemas.microsoft.com/wbem/wsman/1/config/service/certmapping",
"plugin": "http://schemas.microsoft.com/wbem/wsman/1/config/PluginConfiguration",
"wsen": "http://schemas.xmlsoap.org/ws/2004/09/enumeration",
"wsdl": "http://schemas.xmlsoap.org/wsdl",
"wst": "http://schemas.xmlsoap.org/ws/2004/09/transfer",
"wsp": "http://schemas.xmlsoap.org/ws/2004/09/policy",
"wse": "http://schemas.xmlsoap.org/ws/2004/08/eventing",
"i": "http://schemas.microsoft.com/wbem/wsman/1/cim/interactive.xsd",
"xml": "http://www.w3.org/XML/1998/namespace",
"pwsh": "http://schemas.microsoft.com/powershell",
}
def wrap_winrm(
context: gssapi.raw.SecurityContext,
data: bytes,
) -> tuple[bytes, bytes, int]:
iov = gssapi.raw.IOV(
gssapi.raw.IOVBufferType.header,
data,
gssapi.raw.IOVBufferType.padding,
std_layout=False,
)
gssapi.raw.wrap_iov(context, iov, confidential=True, qop=None)
return iov[0].value or b"", iov[1].value or b"", len(iov[2].value or b"")
def unwrap_winrm(
context: gssapi.raw.SecurityContext,
header: bytes,
data: bytes,
) -> bytes:
iov = gssapi.raw.IOV(
(gssapi.raw.IOVBufferType.header, False, header),
data,
(gssapi.raw.IOVBufferType.data, True, None),
std_layout=False,
)
gssapi.raw.unwrap_iov(context, iov)
return iov[1].value or b""
class HTTPWinRMAuth(requests.auth.AuthBase):
def __init__(
self,
context: gssapi.raw.SecurityContext,
) -> None:
self.context = context
self.header = None
self.valid_protocols = ["Negotiate", "Kerberos", "NTLM"]
self._regex = re.compile(
r"(%s)\s*([^,]*),?" % "|".join(self.valid_protocols), re.I
)
def __call__(
self,
request: requests.PreparedRequest,
) -> requests.PreparedRequest:
request.headers["Connection"] = "Keep-Alive"
request.register_hook("response", self.response_hook)
return request
def response_hook(
self,
response: requests.Response,
**kwargs: t.Any,
) -> requests.Response:
if response.status_code == 401:
response = self.handle_401(response, **kwargs)
return response
def handle_401(
self,
response: requests.Response,
**kwargs: t.Any,
) -> requests.Response:
auth_supported = response.headers.get("www-authenticate", "")
matched_protocols = [
p for p in self.valid_protocols if p.upper() in auth_supported.upper()
]
if not matched_protocols:
return response
self.header = matched_protocols = matched_protocols[0]
out_token = self.context.step()
while not self.context.complete or out_token is not None:
response.content
response.raw.release_conn()
request = response.request.copy()
auth_header = b"%s %s" % (self.header.encode(), base64.b64encode(out_token))
request.headers["Authorization"] = auth_header
response = response.connection.send(request, **kwargs)
in_token = None
auth_header = response.headers.get("www-authenticate", "")
token_match = self._regex.search(auth_header)
if token_match:
in_token = token_match.group(2)
if not in_token:
break
out_token = self.context.step(base64.b64decode(in_token))
return response
def winrm_run(
context: gssapi.raw.SecurityContext,
server: str,
command: str,
arguments: t.Optional[t.List[str]] = None,
) -> tuple[int, str, str]:
http = requests.Session()
http.auth = HTTPWinRMAuth(context)
warnings.simplefilter("ignore", category=InsecureRequestWarning)
http.headers = {
"User-Agent": "pyspnego_client",
}
endpoint = "http://%s:5985/wsman" % server
# We need to ensure we have set up the context already so we can start encrypting the data.
request = requests.Request("POST", endpoint, data=None)
prep_request = http.prepare_request(request)
response = http.send(prep_request)
response.raise_for_status()
setattr(http, "endpoint", endpoint)
setattr(http, "session_id", str(uuid.uuid4()).upper())
shell_id = wsman_create(http)
try:
cmd_id = wsman_command(http, shell_id, command, arguments)
rc, stdout, stderr = wsman_receive(http, shell_id, cmd_id)
wsman_signal(
http,
shell_id,
cmd_id,
"http://schemas.microsoft.com/wbem/wsman/1/windows/shell/signal/Terminate",
)
if stderr.startswith("#< CLIXML"):
# Strip off the '#< CLIXML\r\n' by finding the 2nd index of '<'
output = stderr[stderr.index("<", 2) :]
element = ET.fromstring(output)
namespace = element.tag.replace("Objs", "")[1:-1]
errors = []
for error in element.findall("{%s}S[@S='Error']" % namespace):
errors.append(error.text or "")
stderr = "".join(errors).replace("_x000D_", "\r").replace("_x000A_", "\n")
return rc, stdout, stderr
finally:
wsman_delete(http, shell_id)
def wsman_command(
http: requests.Session,
shell_id: str,
command: str,
arguments: t.Optional[t.List[str]] = None,
) -> str:
rsp = WSMAN_NS["rsp"]
command_line = ET.Element("{%s}CommandLine" % rsp)
ET.SubElement(command_line, "{%s}Command" % rsp).text = command
for argument in arguments or []:
ET.SubElement(command_line, "{%s}Arguments" % rsp).text = argument
command_response = wsman_envelope(
"http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Command",
http,
body=command_line,
selector_set={"ShellId": shell_id},
option_set={"WINRS_SKIP_CMD_SHELL": False},
)
return command_response.find(
"s:Body/rsp:CommandResponse/rsp:CommandId", WSMAN_NS
).text
def wsman_create(
http: requests.Session,
) -> str:
rsp = WSMAN_NS["rsp"]
shell = ET.Element("{%s}Shell" % rsp)
ET.SubElement(shell, "{%s}InputStreams" % rsp).text = "stdin"
ET.SubElement(shell, "{%s}OutputStreams" % rsp).text = "stdout stderr"
create_response = wsman_envelope(
"http://schemas.xmlsoap.org/ws/2004/09/transfer/Create",
http,
body=shell,
option_set={"WINRS_CODEPAGE": 65001},
)
return create_response.find("s:Body/rsp:Shell/rsp:ShellId", WSMAN_NS).text
def wsman_delete(
http: requests.Session,
shell_id: str,
) -> None:
wsman_envelope(
"http://schemas.xmlsoap.org/ws/2004/09/transfer/Delete",
http,
selector_set={"ShellId": shell_id},
)
def wsman_receive(
http: requests.Session,
shell_id: str,
command_id: str,
) -> tuple[int, str, str]:
rsp = WSMAN_NS["rsp"]
out = {
"stdout": io.BytesIO(),
"stderr": io.BytesIO(),
}
while True:
receive = ET.Element("{%s}Receive" % rsp)
ET.SubElement(
receive, "{%s}DesiredStream" % rsp, attrib={"CommandId": command_id}
).text = "stdout stderr"
receive_response = wsman_envelope(
"http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Receive",
http,
body=receive,
selector_set={"ShellId": shell_id},
option_set={"WSMAN_CMDSHELL_OPTION_KEEPALIVE": True},
)
streams = receive_response.findall(
"s:Body/rsp:ReceiveResponse/rsp:Stream", WSMAN_NS
)
for stream in streams:
if stream.text:
b_data = base64.b64decode(stream.text)
out[stream.attrib["Name"]].write(b_data)
state = receive_response.find(
"s:Body/rsp:ReceiveResponse/rsp:CommandState", WSMAN_NS
)
if state.attrib["State"].endswith("Done"):
rc = int(state.find("rsp:ExitCode", WSMAN_NS).text)
break
return (
rc,
out["stdout"].getvalue().decode("utf-8"),
out["stderr"].getvalue().decode("utf-8"),
)
def wsman_signal(
http: requests.Session,
shell_id: str,
command_id: str,
code: str,
) -> None:
rsp = WSMAN_NS["rsp"]
signal = ET.Element("{%s}Signal" % rsp, attrib={"CommandId": command_id})
ET.SubElement(signal, "{%s}Code" % rsp).text = code
wsman_envelope(
"http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Signal",
http,
body=signal,
selector_set={"ShellId": shell_id},
)
def wsman_envelope(
action: str,
http: requests.Session,
selector_set: t.Optional[t.Dict[str, str]] = None,
option_set: t.Optional[t.Dict[str, str]] = None,
body: t.Optional[ET.Element] = None,
) -> ET.Element:
s = WSMAN_NS["s"]
wsa = WSMAN_NS["wsa"]
wsman = WSMAN_NS["wsman"]
wsmv = WSMAN_NS["wsmv"]
xml = WSMAN_NS["xml"]
understand = "{%s}mustUnderstand" % s
envelope = ET.Element("{%s}Envelope" % s)
header = ET.SubElement(envelope, "{%s}Header" % WSMAN_NS["s"])
ET.SubElement(header, "{%s}Action" % wsa, attrib={understand: "true"}).text = action
ET.SubElement(header, "{%s}SessionId" % wsmv, attrib={understand: "false"}).text = (
"uuid:%s" % http.session_id.upper()
)
ET.SubElement(header, "{%s}To" % wsa).text = http.endpoint
ET.SubElement(
header, "{%s}MaxEnvelopeSize" % wsman, attrib={understand: "true"}
).text = "153600"
ET.SubElement(header, "{%s}MessageID" % wsa).text = (
"uuid:%s" % str(uuid.uuid4()).upper()
)
ET.SubElement(header, "{%s}OperationTimeout" % wsman).text = "PT30S"
reply_to = ET.SubElement(header, "{%s}ReplyTo" % wsa)
ET.SubElement(
reply_to, "{%s}Address" % wsa, attrib={understand: "true"}
).text = "http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous"
ET.SubElement(
header, "{%s}ResourceURI" % wsman, attrib={understand: "true"}
).text = "http://schemas.microsoft.com/wbem/wsman/1/windows/shell/cmd"
for e in ["DataLocale", "Locale"]:
ET.SubElement(
header,
"{%s}%s" % (wsmv, e),
attrib={understand: "false", "{%s}lang" % xml: "en-US"},
)
for set_value, name, option_name in [
(selector_set, "SelectorSet", "Selector"),
(option_set, "OptionSet", "Option"),
]:
if not set_value:
continue
set_element = ET.SubElement(header, "{%s}%s" % (wsman, name))
if name == "OptionSet":
set_element.attrib = {understand: "true"}
for key, value in set_value.items():
ET.SubElement(
set_element, "{%s}%s" % (wsman, option_name), Name=key
).text = str(value)
envelope_body = ET.SubElement(envelope, "{%s}Body" % s)
if body is not None:
envelope_body.append(body)
content = ET.tostring(envelope, encoding="utf-8", method="xml")
boundary = "Encrypted Boundary"
if http.endpoint.startswith("http://"):
auth_protocol = "SPNEGO"
protocol = "application/HTTP-%s-session-encrypted" % auth_protocol
max_size = 16384 if auth_protocol == "CredSSP" else len(content)
chunks = [content[i : i + max_size] for i in range(0, len(content), max_size)]
encrypted_chunks = []
for chunk in chunks:
header, wrapped_data, padding_length = wrap_winrm(http.auth.context, chunk)
wrapped_data = struct.pack("<i", len(header)) + header + wrapped_data
msg_length = str(len(content) + padding_length)
content = "\r\n".join(
[
"--%s" % boundary,
"\tContent-Type: %s" % protocol,
"\tOriginalContent: type=application/soap+xml;charset=UTF-8;Length=%s"
% msg_length,
"--%s" % boundary,
"\tContent-Type: application/octet-stream",
"",
]
)
encrypted_chunks.append(content.encode() + wrapped_data)
content_sub_type = (
"multipart/encrypted"
if len(encrypted_chunks) == 1
else "multipart/x-multi-encrypted"
)
content_type = '%s;protocol="%s";boundary="%s"' % (
content_sub_type,
protocol,
boundary,
)
content = b"".join(encrypted_chunks) + ("--%s--\r\n" % boundary).encode()
else:
content_type = "application/soap+xml;charset=UTF-8"
headers = {
"Content-Length": str(len(content)),
"Content-Type": content_type,
}
request = http.prepare_request(
requests.Request("POST", http.endpoint, data=content, headers=headers)
)
response = http.send(request)
response.raise_for_status()
content = response.content
content_type = response.headers.get("content-type", "")
if content_type.startswith("multipart/encrypted;") or content_type.startswith(
"multipart/x-multi-encrypted;"
):
boundary = re.search("boundary=[" '|\\"](.*)[' '|\\"]', content_type).group(1)
parts = re.compile((r"--\s*%s\r\n" % re.escape(boundary)).encode()).split(
content
)
parts = list(filter(None, parts))
content = b""
for i in range(0, len(parts), 2):
header = parts[i].strip()
payload = parts[i + 1]
expected_length = int(header.split(b"Length=")[1])
# remove the end MIME block if it exists
payload = re.sub((r"--\s*%s--\r\n$" % boundary).encode(), b"", payload)
wrapped_data = payload.replace(
b"\tContent-Type: application/octet-stream\r\n", b""
)
header_length = struct.unpack("<i", wrapped_data[:4])[0]
header = wrapped_data[4 : 4 + header_length]
wrapped_data = wrapped_data[4 + header_length :]
unwrapped_data = unwrap_winrm(http.auth.context, header, wrapped_data)
assert len(unwrapped_data) == expected_length
content += unwrapped_data
return ET.fromstring(content)
def main() -> None:
kerberos = gssapi.OID.from_int_seq("1.2.840.113554.1.2.2")
target_host = sys.argv[1]
cmd = sys.argv[2]
arguments = None
if len(sys.argv) > 3:
arguments = sys.argv[3:]
ctx = gssapi.SecurityContext(
name=gssapi.Name(
f"host@{target_host}", name_type=gssapi.NameType.hostbased_service
),
creds=None,
mech=kerberos,
usage="initiate",
)
rc, stdout, stderr = winrm_run(ctx, target_host, cmd, arguments)
sys.stdout.write(stdout)
sys.stderr.write(stderr)
sys.exit(rc)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment