Last active
October 7, 2022 05:49
-
-
Save hrz6976/fd3034f48a14ec66272ead7829aa030d to your computer and use it in GitHub Desktop.
python subprocess wrapper & color print
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# run command and color print | |
import subprocess | |
import os | |
import sys | |
import logging | |
from typing import Dict, Optional, TextIO, Tuple | |
from io import StringIO | |
def run_command(command: str, | |
env: Optional[Dict[str, str]]=None, | |
logfile: Optional[TextIO]=None, | |
realtime_output: bool=True) -> Tuple[int, str]: | |
""" | |
Run command, log, returns, and see output in realtime (python 3.5+) | |
param command: command to run | |
param logfile: log stdout & stderr to logfile | |
param env: environment variables to set | |
param realtime_output: if True, output will be printed to stdout and stderr | |
return: return code & stdout of command | |
""" | |
_decoder = 'gbk' if sys.platform == 'win32' else 'utf-8' | |
_env = os.environ.copy() | |
if env: | |
_env.update(env) | |
_cmd = [x for x in command.split(" ") if x != ""] | |
if logfile: | |
f_log: TextIO = open(logfile, "w+", encoding='utf-8') | |
_stdout = StringIO() | |
try: | |
# walkaround windows encoding | |
with subprocess.Popen( | |
args=_cmd, | |
env=_env, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
bufsize=0, | |
) as p: | |
char = p.stdout.read(1) | |
while char != b'': | |
# print to screen | |
if realtime_output: | |
sys.stdout.buffer.write(char) | |
try: | |
_decoded = char.decode(_decoder) | |
except UnicodeDecodeError: | |
_decoded = "" | |
# print to return value | |
print(_decoded, end='', flush=True, file=_stdout) | |
if logfile: | |
# print to log | |
print(_decoded, end='', flush=True, file=f_log) | |
char = p.stdout.read(1) | |
# check return code (may not work on windows) | |
if p.returncode != 0: | |
logging.error("command %s failed with return code %d", command, p.returncode) | |
return p.returncode, _stdout.getvalue() | |
except Exception as e: | |
logging.error(f"Error running command {command}: {e}") | |
raise e | |
finally: | |
_stdout.close() | |
if logfile: | |
f_log.close() | |
# color print | |
def print_color(text: str, color=""): | |
colors = { | |
'red': '\033[91m', | |
'green': '\033[92m', | |
'yellow': '\033[93m', | |
'blue': '\033[94m', | |
'magenta': '\033[95m', | |
'cyan': '\033[96m', | |
'white': '\033[97m', | |
'end': '\033[0m' | |
} | |
if color in colors: | |
print(colors[color] + text + colors['end']) | |
else: | |
print(text) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment