|
import logging |
|
import shlex |
|
import signal |
|
import subprocess |
|
import threading |
|
import time |
|
|
|
class SubpResult(): |
|
cmd, data, duration, exception = (None, None, None, None) |
|
out, err = (b'', b'') |
|
rc = -1 |
|
|
|
def __init__(self, **kwargs): |
|
bad = ','.join([k for k in kwargs if not hasattr(self, k)]) |
|
if bad: |
|
raise AttributeError( |
|
f"{self.__class__.__name__} object has no attributes: {bad}") |
|
for k, v in kwargs.items(): |
|
setattr(self, k, v) |
|
|
|
def __str__(self): |
|
def indent(name, data): |
|
if data is None: |
|
return "{name}: {data}" |
|
|
|
if hasattr(data, 'decode'): |
|
data = data.decode('utf-8', errors='ignore') |
|
|
|
return "%s: %s" % ( |
|
name, |
|
''.join([" " + line + "\n" for line in data.splitlines()])) |
|
|
|
return '\n'.join([ |
|
"cmd: %s" % '' if not self.cmd else shell_quote(self.cmd), |
|
f"rc: {self.rc}", |
|
"duration: %.2fs" % self.duration, |
|
"exc: %s" % self.exception, |
|
indent("stdout", self.out), |
|
indent("stderr", self.err), |
|
""]) |
|
|
|
|
|
class SubpError(Exception): |
|
def __init__(self, result, desc=None): |
|
super(SubpError, self).__init__(desc) |
|
self.desc = desc |
|
self.result = result |
|
|
|
def __str__(self): |
|
ret = "" if not self.desc else "desc: %s\n" % self.desc |
|
ret += "" if not self.result.exception else "exception: %s\n" % self.result.exception |
|
return ret + str(self.result) |
|
|
|
|
|
def shell_quote(cmd): |
|
return ' '.join(shlex.quote(a) for a in cmd) |
|
|
|
|
|
def subp(cmd, capture=True, data=None, rcs=(0,), timeout=None, killsig=signal.SIGTERM, |
|
logger=None): |
|
"""Execute a subprocess. |
|
|
|
cmd: string or list to execute. string is not split. |
|
capture: capture and return stdin and stdout in the SubpResult |
|
data: stdin to the process. |
|
rcs: list of exit codes that should not raise exception. |
|
timeout: maximum time in seconds for process to finish. |
|
killsig: the signal to send to the process. Standard Popen interface |
|
always sends SIGKILL, which does not allow process to clean up. |
|
|
|
Return is a SubpResult. |
|
If rcs is provided and exit code is not in the list of rcs, then |
|
a SubpError is raised. |
|
""" |
|
if isinstance(rcs, int): |
|
rcs = (rcs,) |
|
|
|
# allow user to pass in a string as data |
|
if hasattr(data, 'encode'): |
|
data = data.encode('utf-8') |
|
|
|
devnull_fp = None |
|
stdin = subprocess.PIPE |
|
stdout, stderr = (None, None) |
|
if capture: |
|
stdout, stderr = (subprocess.PIPE, subprocess.PIPE) |
|
|
|
result = SubpResult(cmd=cmd, rc=-1) |
|
start = time.time() |
|
try: |
|
if data is None: |
|
# devnull ensures any read gets null and wont ever block. |
|
devnull_fp = open(os.devnull, encoding="utf-8") |
|
stdin = devnull_fp |
|
|
|
if logger is not None: |
|
logger("Executing: %s", shell_quote(cmd)) |
|
sp = subprocess.Popen( |
|
cmd, stdout=stdout, stderr=stderr, stdin=stdin) |
|
|
|
def communicate(): |
|
try: |
|
(result.out, result.err) = sp.communicate(data) |
|
except Exception as e: |
|
result.exception = e |
|
|
|
if timeout is not None and killsig != signal.SIGKILL: |
|
thread = threading.Thread(target=communicate) |
|
thread.start() |
|
thread.join(timeout) |
|
if thread.is_alive(): |
|
sp.send_signal(killsig) |
|
thread.join() |
|
result.exception = subprocess.TimeoutExpired(cmd=cmd, timeout=timeout) |
|
else: |
|
communicate() |
|
|
|
result.rc = sp.returncode |
|
|
|
if not result.out: |
|
result.out = b'' |
|
if not result.err: |
|
result.err = b'' |
|
|
|
finally: |
|
result.duration = time.time() - start |
|
if devnull_fp: |
|
devnull_fp.close() |
|
|
|
if logger is not None: |
|
logger("returned %d took %.2fs", result.rc, result.duration) |
|
if rcs is None or result.rc in rcs: |
|
return result |
|
|
|
raise SubpError(result) |