Skip to content

Instantly share code, notes, and snippets.

@ssbunyk
Created April 24, 2014 18:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ssbunyk/11264280 to your computer and use it in GitHub Desktop.
Save ssbunyk/11264280 to your computer and use it in GitHub Desktop.
def main():
host = '...'
passowrd = '...'
print run_cmd('http://%s:5985/wsman' % host, 'Administrator', password, 'powershell', ['-Command', '"(Get-WmiObject -class Win32_OperatingSystem).Caption"'])
def run_cmd(url, username, password, command, args=()):
protocol = Protocol(url, username=username, password=password)
shell_id = protocol.open_shell()
command_id = protocol.run_command(shell_id, command, args)
rs = protocol.get_command_output(shell_id, command_id)[0]
protocol.cleanup_command(shell_id, command_id)
protocol.close_shell(shell_id)
return rs
import base64
import uuid
import xml.etree.ElementTree as ET
import xmltodict
class Protocol(object):
"""
This is the main class that does the SOAP request/response logic. There are a few helper classes, but pretty
much everything comes through here first.
"""
def __init__(self, endpoint, username=None, password=None):
self.endpoint = endpoint
self.username = username
self.password = password
self.timeout = 10
def open_shell(self):
"""
Create a Shell on the destination host
@returns The ShellId from the SOAP response. This is our open shell instance on the remote machine.
@rtype string
"""
rq = {
'env:Envelope': self._get_soap_header(
resource_uri='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/cmd',
action='http://schemas.xmlsoap.org/ws/2004/09/transfer/Create'
)
}
header = rq['env:Envelope']['env:Header']
header['w:OptionSet'] = {
'w:Option': [
{
'@Name': 'WINRS_NOPROFILE',
'#text': 'FALSE',
},
{
'@Name': 'WINRS_CODEPAGE',
'#text': '437',
}
]
}
shell = rq['env:Envelope'].setdefault('env:Body', {}).setdefault('rsp:Shell', {})
shell['rsp:InputStreams'] = 'stdin'
shell['rsp:OutputStreams'] = 'stdout stderr'
rs = self.send_message(xmltodict.unparse(rq))
root = ET.fromstring(rs)
return next(node for node in root.findall('.//*') if node.get('Name') == 'ShellId').text
# Helper method for building SOAP Header
def _get_soap_header(self, action=None, resource_uri=None, shell_id=None, message_id=None):
if not message_id:
message_id = uuid.uuid4()
header = {
'@xmlns:xsd': 'http://www.w3.org/2001/XMLSchema',
'@xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'@xmlns:env': 'http://www.w3.org/2003/05/soap-envelope',
'@xmlns:a': 'http://schemas.xmlsoap.org/ws/2004/08/addressing',
'@xmlns:b': 'http://schemas.dmtf.org/wbem/wsman/1/cimbinding.xsd',
'@xmlns:n': 'http://schemas.xmlsoap.org/ws/2004/09/enumeration',
'@xmlns:x': 'http://schemas.xmlsoap.org/ws/2004/09/transfer',
'@xmlns:w': 'http://schemas.dmtf.org/wbem/wsman/1/wsman.xsd',
'@xmlns:p': 'http://schemas.microsoft.com/wbem/wsman/1/wsman.xsd',
'@xmlns:rsp': 'http://schemas.microsoft.com/wbem/wsman/1/windows/shell',
'@xmlns:cfg': 'http://schemas.microsoft.com/wbem/wsman/1/config',
'env:Header': {
'a:To': 'http://windows-host:5985/wsman',
'a:ReplyTo': {
'a:Address': {
'@mustUnderstand': 'true',
'#text': 'http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous'
}
},
'w:MaxEnvelopeSize': {
'@mustUnderstand': 'true',
'#text': '153600'
},
'a:MessageID': 'uuid:{0}'.format(message_id),
'w:Locale': {
'@mustUnderstand': 'false',
'@xml:lang': 'en-US'
},
'p:DataLocale': {
'@mustUnderstand': 'false',
'@xml:lang': 'en-US'
},
'w:OperationTimeout': 'PT60S',
'w:ResourceURI': {
'@mustUnderstand': 'true',
'#text': resource_uri
},
'a:Action': {
'@mustUnderstand': 'true',
'#text': action
}
}
}
if shell_id:
header['env:Header']['w:SelectorSet'] = {
'w:Selector': {
'@Name': 'ShellId',
'#text': shell_id
}
}
return header
def send_message(self, message):
headers = {
'Content-Type' : 'application/soap+xml;charset=UTF-8',
'User-Agent' : 'Python WinRM client'
}
response = requests.post(
self.endpoint,
data=message,
headers=headers,
timeout=self.timeout,
auth=(self.username, self.password),
)
return response.text
def close_shell(self, shell_id):
"""
Close the shell
@param string shell_id: The shell id on the remote machine. See #open_shell
"""
message_id = uuid.uuid4()
rq = {
'env:Envelope': self._get_soap_header(
resource_uri='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/cmd',
action='http://schemas.xmlsoap.org/ws/2004/09/transfer/Delete',
shell_id=shell_id,
message_id=message_id
)
}
# SOAP message requires empty env:Body
rq['env:Envelope'].setdefault('env:Body', {})
rs = self.send_message(xmltodict.unparse(rq))
root = ET.fromstring(rs)
relates_to = next(node for node in root.findall('.//*') if node.tag.endswith('RelatesTo')).text
# TODO change assert into user-friendly exception
assert uuid.UUID(relates_to.replace('uuid:', '')) == message_id
def run_command(self, shell_id, command, arguments=(), console_mode_stdin=True, skip_cmd_shell=False):
"""
Run a command on a machine with an open shell
@param string shell_id: The shell id on the remote machine. See #open_shell
@param string command: The command to run on the remote machine
@param iterable of string arguments: An array of arguments for this command
@param bool console_mode_stdin: (default: True)
@param bool skip_cmd_shell: (default: False)
@return: The CommandId from the SOAP response. This is the ID we need to query in order to get output.
@rtype string
"""
rq = {'env:Envelope': self._get_soap_header(
resource_uri='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/cmd',
action='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Command',
shell_id=shell_id)}
header = rq['env:Envelope']['env:Header']
header['w:OptionSet'] = {
'w:Option': [
{
'@Name': 'WINRS_CONSOLEMODE_STDIN',
'#text': str(console_mode_stdin).upper()
},
{
'@Name': 'WINRS_SKIP_CMD_SHELL',
'#text': str(skip_cmd_shell).upper()
}
]
}
cmd_line = rq['env:Envelope'].setdefault('env:Body', {})\
.setdefault('rsp:CommandLine', {})
cmd_line['rsp:Command'] = {'#text': command}
if arguments:
cmd_line['rsp:Arguments'] = ' '.join(arguments)
rs = self.send_message(xmltodict.unparse(rq))
root = ET.fromstring(rs)
command_id = next(node for node in root.findall('.//*') if node.tag.endswith('CommandId')).text
return command_id
def cleanup_command(self, shell_id, command_id):
"""
Clean-up after a command. @see #run_command
@param string shell_id: The shell id on the remote machine. See #open_shell
@param string command_id: The command id on the remote machine. See #run_command
@returns: This should have more error checking but it just returns true for now.
@rtype bool
"""
message_id = uuid.uuid4()
rq = {
'env:Envelope': self._get_soap_header(
resource_uri='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/cmd',
action='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Signal',
shell_id=shell_id,
message_id=message_id
)
}
# Signal the Command references to terminate (close stdout/stderr)
signal = rq['env:Envelope'].setdefault('env:Body', {}).setdefault('rsp:Signal', {})
signal['@CommandId'] = command_id
signal['rsp:Code'] = \
'http://schemas.microsoft.com/wbem/wsman/1/windows/shell/signal/terminate'
rs = self.send_message(xmltodict.unparse(rq))
root = ET.fromstring(rs)
relates_to = next(node for node in root.findall('.//*') if node.tag.endswith('RelatesTo')).text
# TODO change assert into user-friendly exception
assert uuid.UUID(relates_to.replace('uuid:', '')) == message_id
def get_command_output(self, shell_id, command_id):
"""
Get the Output of the given shell and command
@param string shell_id: The shell id on the remote machine. See #open_shell
@param string command_id: The command id on the remote machine. See #run_command
#@return [Hash] Returns a Hash with a key :exitcode and :data. Data is an Array of Hashes where the cooresponding key
# is either :stdout or :stderr. The reason it is in an Array so so we can get the output in the order it ocurrs on
# the console.
"""
stdout_buffer, stderr_buffer = [], []
command_done = False
while not command_done:
stdout, stderr, return_code, command_done = \
self._raw_get_command_output(shell_id, command_id)
stdout_buffer.append(stdout)
stderr_buffer.append(stderr)
return ''.join(stdout_buffer), ''.join(stderr_buffer), return_code
def _raw_get_command_output(self, shell_id, command_id):
rq = {
'env:Envelope': self._get_soap_header(
resource_uri='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/cmd',
action='http://schemas.microsoft.com/wbem/wsman/1/windows/shell/Receive',
shell_id=shell_id
)
}
stream = rq['env:Envelope'].setdefault('env:Body', {}).setdefault('rsp:Receive', {})\
.setdefault('rsp:DesiredStream', {})
stream['@CommandId'] = command_id
stream['#text'] = 'stdout stderr'
rs = self.send_message(xmltodict.unparse(rq))
root = ET.fromstring(rs)
stream_nodes = [node for node in root.findall('.//*') if node.tag.endswith('Stream')]
stdout = stderr = ''
return_code = -1
for stream_node in stream_nodes:
if stream_node.text:
if stream_node.attrib['Name'] == 'stdout':
stdout += str(base64.b64decode(stream_node.text.encode('ascii')))
elif stream_node.attrib['Name'] == 'stderr':
stderr += str(base64.b64decode(stream_node.text.encode('ascii')))
# We may need to get additional output if the stream has not finished.
# The CommandState will change from Running to Done like so:
# @example
# from...
# <rsp:CommandState CommandId="..." State="http://schemas.microsoft.com/wbem/wsman/1/windows/shell/CommandState/Running"/>
# to...
# <rsp:CommandState CommandId="..." State="http://schemas.microsoft.com/wbem/wsman/1/windows/shell/CommandState/Done">
# <rsp:ExitCode>0</rsp:ExitCode>
# </rsp:CommandState>
command_done = len([node for node in root.findall('.//*') if node.get('State', '').endswith('CommandState/Done')]) == 1
if command_done:
return_code = int(next(node for node in root.findall('.//*') if node.tag.endswith('ExitCode')).text)
return stdout, stderr, return_code, command_done
import requests
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment