|
import subprocess |
|
import json |
|
from json.decoder import JSONDecodeError |
|
import subprocess |
|
from typing import Union, Any |
|
from termcolor import colored |
|
import shlex |
|
|
|
|
|
def run_pwsh( |
|
script: str = None, |
|
command: str = None, |
|
args: Union[list, dict] = [], |
|
print_output: bool = True, |
|
parse_json: bool = True, |
|
) -> Union[str, int, dict, list]: |
|
''' |
|
Invokes a PowerShell command with an optional script import |
|
|
|
Parameters: |
|
command: PS command/function to run |
|
If not specified, script is required and will be ran directly. |
|
args: list or dictionary of arguments (optional) |
|
script: path to the PS script to import (optional) |
|
parse_json: Try to parse the last line of output as JSON and return it. |
|
The last output line from the PS script/function must be a single line of JSON. |
|
If false or last line is not valid JSON, the full command output will be returned. |
|
|
|
Returns: |
|
parsed json output (string/int/list/dict) or raw output (string) |
|
''' |
|
|
|
if not script and not command: |
|
raise ValueError('at least one of arguments script or command is required') |
|
|
|
caught_error_key = '_PsUnhandledException_' |
|
|
|
if command: |
|
ps_args = [command] |
|
if type(args) == dict: |
|
for key, value in args.items(): |
|
ps_args.append(f'-{key}') |
|
ps_args.append(str(value)) |
|
elif type(args) == list: |
|
for arg in args: |
|
ps_args.append(str(arg)) |
|
ps_cmd = shlex.join(ps_args) |
|
else: |
|
ps_cmd = '' |
|
|
|
ps_code = f''' |
|
try {{ |
|
# Import script if set |
|
{f'. {script}' if script else ''} |
|
{ps_cmd} |
|
}} catch {{ |
|
$_ # prints the error formatted with colors |
|
$e = $_ | Select-Object ErrorDetails, ErrorRecord, CategoryInfo, ScriptStackTrace |
|
$errorJson = @{{ {caught_error_key} = $e }} | ConvertTo-Json -Compress -Depth 10 |
|
Write-Host $errorJson |
|
exit 1 |
|
}} |
|
''' |
|
# print(ps_code) |
|
|
|
print(colored(f'> PS [{script}] {command}', 'blue')) |
|
cmd = ['pwsh', '-command', ps_code] |
|
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
|
output = [] |
|
while True: |
|
line = process.stdout.readline().decode('utf-8') |
|
if process.poll() is not None and line == '': |
|
break |
|
if line: |
|
if line.strip(): |
|
output.append(line.strip()) |
|
if print_output: |
|
print(line.strip()) |
|
|
|
exit_code = process.poll() |
|
|
|
last_line_json = None |
|
try: |
|
last_line = output[-1] |
|
last_line_json = json.loads(last_line) |
|
except (KeyError, JSONDecodeError): |
|
pass |
|
|
|
if exit_code != 0: |
|
msg = f'PowerShell exited with non-zero code {exit_code}' |
|
# Check if an exception occurred in PS script |
|
if last_line_json and caught_error_key in last_line_json: |
|
error = last_line_json[caught_error_key] |
|
info = error.get('CategoryInfo', {}) |
|
raise PowerShellException(f"{msg}: {info.get('Reason')} {info.get('TargetName')}", error) |
|
raise PowerShellException(msg) |
|
|
|
if parse_json: |
|
if last_line_json: |
|
return last_line_json |
|
else: |
|
print(colored('WARNING: JSON not detected on last line of PowerShell output', 'yellow')) |
|
return '\n'.join(output) |
|
else: |
|
return '\n'.join(output) |
|
|
|
class PowerShellException(Exception): |
|
pass |