Created
September 14, 2022 14:24
-
-
Save LewisGaul/263bd86e2af83fa251b2b71b60b508a5 to your computer and use it in GitHub Desktop.
Wrapper around subprocess.run()
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__all__ = ("run_cmd",) | |
import contextlib | |
import os.path | |
import logging | |
import shlex | |
import subprocess | |
import tempfile | |
from typing import List | |
logger = logging.getLogger(__name__) | |
def run_cmd( | |
cmd: List[str], *, log_output: bool = False, **kwargs | |
) -> subprocess.CompletedProcess[str]: | |
""" | |
Run a command, capturing stdout and stderr by default, and raising on error. | |
:param cmd: | |
The command to run. | |
:param log_output: | |
Whether to log the output. | |
:param kwargs: | |
Passed through to subprocess.run(). | |
:raise subprocess.CalledProcessError: | |
If the command returns non-zero exit status. | |
:raise subprocess.TimeoutExpired: | |
If timeout is given and the command times out. | |
:return: | |
Completed process object from subprocess.run(). | |
""" | |
logger.debug("Running command: %r", shlex.join(cmd)) | |
kwargs = { | |
"check": True, | |
"text": True, | |
"encoding": "utf-8", | |
**kwargs, | |
} | |
if not {"stdout", "stderr", "capture_output"}.intersection( | |
kwargs | |
) or kwargs.pop("capture_output", False): | |
kwargs["stdout"] = subprocess.PIPE | |
kwargs["stderr"] = subprocess.PIPE | |
elif ( | |
"stdout" not in kwargs | |
and kwargs.get("stderr", None) == subprocess.STDOUT | |
): | |
kwargs["stdout"] = subprocess.PIPE | |
# Writing output to temporary files as using subprocess.PIPE sometimes | |
# causes the command to hang when reading output. | |
# This is very sad. | |
with contextlib.ExitStack() as ctxs: | |
stdout_file = None | |
stderr_file = None | |
if any(kwargs.get(s, None) for s in ("stdout", "stderr")): | |
tmpdir = ctxs.enter_context(tempfile.TemporaryDirectory()) | |
if kwargs.get("stdout", None) == subprocess.PIPE: | |
stdout_file = ctxs.enter_context( | |
open( | |
os.path.join(tmpdir, "stdout"), "w+", encoding="utf-8" | |
) | |
) | |
kwargs["stdout"] = stdout_file | |
if kwargs.get("stderr", None) == subprocess.STDOUT: | |
kwargs["stderr"] = stdout_file | |
if kwargs.get("stderr", None) == subprocess.PIPE: | |
stderr_file = ctxs.enter_context( | |
open( | |
os.path.join(tmpdir, "stderr"), "w+", encoding="utf-8" | |
) | |
) | |
kwargs["stderr"] = stderr_file | |
try: | |
p: subprocess.CompletedProcess[str] = subprocess.run(cmd, **kwargs) | |
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: | |
if stdout_file: | |
stdout_file.seek(0) | |
e.stdout = stdout_file.read() | |
if stderr_file: | |
stderr_file.seek(0) | |
e.stderr = stderr_file.read() | |
if isinstance(e, subprocess.CalledProcessError): | |
issue_desc = "failed" | |
rc = e.returncode | |
else: | |
issue_desc = "timed out" | |
rc = None | |
if e.stderr: | |
logger.debug( | |
"Command %s with exit code %s, stdout:\n%s\nstderr:\n%s", | |
issue_desc, | |
rc, | |
e.stdout.strip("\n"), | |
e.stderr.strip("\n"), | |
) | |
elif e.stdout: | |
logger.debug( | |
"Command %s with exit code %s, output:\n%s", | |
issue_desc, | |
rc, | |
e.stdout.strip("\n"), | |
) | |
else: | |
logger.debug("Command %s with exit code %s", issue_desc, rc) | |
raise | |
else: | |
if stdout_file: | |
stdout_file.seek(0) | |
p.stdout = stdout_file.read() | |
if stderr_file: | |
stderr_file.seek(0) | |
p.stderr = stderr_file.read() | |
if log_output: | |
logger.debug("Command stdout:\n%s", (p.stdout or "").strip("\n")) | |
logger.debug("Command stderr:\n%s", (p.stderr or "").strip("\n")) | |
return p |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment