Skip to content

Instantly share code, notes, and snippets.

@nathan815
Last active February 8, 2022 06:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nathan815/4c55674afc0446ea57450052732973af to your computer and use it in GitHub Desktop.
Save nathan815/4c55674afc0446ea57450052732973af to your computer and use it in GitHub Desktop.
Call PowerShell from Python with support for color output and exception handling - using PTY to simulate a TTY
import subprocess
import json
from json.decoder import JSONDecodeError
import errno
import os
import pty
import select
import subprocess
import re
from typing import Tuple, Any, Union
def run_powershell(
script: str,
function: str='',
print_stdout: bool=True,
print_stderr: bool=True,
output: str='json',
) -> Tuple[Any, list]:
'''
Invokes a PowerShell script or specific function
Parameters:
script: path to the PS script
function: PS function to call, can also pass any arguments to it
print_stdout: print PS stdout to console
print_stderr: print PS stderr to console
output: 'json' | 'raw'
json: parse output as JSON. The last output line from the PS script/function MUST be a single line of JSON.
raw: return all stdout lines without parsing
Returns:
(output, err)
Example:
my_script.ps1
function myPsFunction(a, b) {
$hosts = 'esx01', 'esx02', a, b
return $hosts | ConvertTo-Json -Compress
}
test.py
data, err = run_powershell('my_script.ps1', function='myPsFunction x y')
print(data)
output:
['esx01', 'esx02', 'x', 'y']
'''
if output not in ['json', 'raw']:
raise ValueError('invalid value for argument output')
caught_error_key = '_PsUnhandledException_'
# Import the PS script and run the specified function
ps_code = f'''
$ErrorActionPreference = "stop";
try {{
. {script};
{function};
}} catch {{
$e = $_ | Select-Object ErrorDetails, ErrorRecord, CategoryInfo, ScriptStackTrace
$errorJson = @{{ {caught_error_key} = $e }} | ConvertTo-Json -Compress -Depth 10
Write-Host $errorJson
exit 1
}}
'''
print(ps_code)
cmd = ['pwsh', '-command', ps_code]
print(f'[PowerShell => {script} {function}]')
output_lines = {'stdout': [], 'stderr': []}
current_line = {'stdout': b'', 'stderr': b''}
ansi_escape_re = re.compile(r'(?:\x1b[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
exit_code = None
for output in _run_cmd_tty_stream(cmd):
returncode = output.get('returncode', None)
if returncode is not None:
exit_code = returncode
else:
name = output['name']
data = output['data']
current_line[name] += data
eol = b'\n' in current_line[name][-2:]
if eol:
try:
line = current_line[name].decode('utf-8')
except AttributeError:
line = current_line[name]
if print_stdout and name == 'stdout' or print_stderr and name == 'stderr':
print(line, end='')
output_lines[name].append(ansi_escape_re.sub('', line).replace('\x1b=', ''))
current_line[name] = b''
stdout = output_lines['stdout']
stderr = output_lines['stderr']
last_line_json = None
try:
last_line = stdout[-1]
last_line_json = json.loads(last_line)
except JSONDecodeError:
pass
if exit_code != 0:
# Check if an exception occurred in PS script
msg = f'PowerShell exited with non-zero code {exit_code}'
if caught_error_key in last_line_json:
error = last_line_json[caught_error_key]
info = error.get('CategoryInfo', {})
raise PowerShellException(f"{msg} Exception: {info.get('Reason')} {info.get('TargetName')}", error)
raise PowerShellException(msg)
if output == 'raw':
return stdout, stderr
elif output == 'json':
if not last_line_json:
raise PowerShellException(f'JSON not detected on last line of PS stdout', stdout)
return last_line_json, stderr
def _run_cmd_tty_stream(cmd, bytes_input=b''):
"""Streams the output of cmd with bytes_input to stdin,
with stdin, stdout and stderr as TTYs.
Each yield from this function is:
{ "name": "stdout/stderr", "data": b"", "returncode": 0 }
Adapted from https://stackoverflow.com/a/52954716/507629
and https://gist.github.com/hayd/4f46a68fc697ba8888a7b517a414583e
"""
# provide tty to enable line-buffering
mo, so = pty.openpty() # stdout
me, se = pty.openpty() # stderr
mi, si = pty.openpty() # stdin
p = subprocess.Popen(
cmd,
bufsize=1, stdin=si, stdout=so, stderr=se,
close_fds=True)
for fd in [so, se, si]:
os.close(fd)
os.write(mi, bytes_input)
timeout = 0.04 # seconds
readable = [mo, me]
fd_name = {mo: 'stdout', me: 'stderr'}
try:
while readable:
ready, _, _ = select.select(readable, [], [], timeout)
for fd in ready:
try:
data = os.read(fd, 512)
except OSError as e:
if e.errno != errno.EIO:
raise
# EIO means EOF on some systems
readable.remove(fd)
else:
if not data: # EOF
readable.remove(fd)
yield {'name': fd_name[fd], 'data': data, 'returncode': None}
finally:
for fd in [mo, me, mi]:
os.close(fd)
if p.poll() is None:
p.kill()
p.wait()
yield {'name': None, 'data': None, 'returncode': p.returncode}
return
class PowerShellException(Exception):
pass
function myPsFunction($a, $b) {
$data = 'abc', 'def', $a, $b
Write-Host "This should be green" -ForegroundColor green
Write-Error "Test error!"
Write-Host "This should be magenta" -ForegroundColor magenta
$host.ui.WriteErrorLine("test raw stderror output")
Write-Host "No color"
return $data | ConvertTo-Json -Compress
}
from powershell import run_powershell, PowerShellException
try:
data, err = run_powershell('test.ps1', function='myPsFunction arg1 arg2')
print('parsed array from PS: ', data)
for d in data:
print(d)
except PowerShellException as e:
print('PS Exception: ', e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment