Skip to content

Instantly share code, notes, and snippets.

@LewisGaul
Created September 14, 2022 14:24
Show Gist options
  • Save LewisGaul/263bd86e2af83fa251b2b71b60b508a5 to your computer and use it in GitHub Desktop.
Save LewisGaul/263bd86e2af83fa251b2b71b60b508a5 to your computer and use it in GitHub Desktop.
Wrapper around subprocess.run()
__all__ = ("run_cmd",)
import contextlib
import os.path
import logging
import shlex
import subprocess
import tempfile
from typing import List
logger = logging.getLogger(__name__)
def run_cmd(
cmd: List[str], *, log_output: bool = False, **kwargs
) -> subprocess.CompletedProcess[str]:
"""
Run a command, capturing stdout and stderr by default, and raising on error.
:param cmd:
The command to run.
:param log_output:
Whether to log the output.
:param kwargs:
Passed through to subprocess.run().
:raise subprocess.CalledProcessError:
If the command returns non-zero exit status.
:raise subprocess.TimeoutExpired:
If timeout is given and the command times out.
:return:
Completed process object from subprocess.run().
"""
logger.debug("Running command: %r", shlex.join(cmd))
kwargs = {
"check": True,
"text": True,
"encoding": "utf-8",
**kwargs,
}
if not {"stdout", "stderr", "capture_output"}.intersection(
kwargs
) or kwargs.pop("capture_output", False):
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.PIPE
elif (
"stdout" not in kwargs
and kwargs.get("stderr", None) == subprocess.STDOUT
):
kwargs["stdout"] = subprocess.PIPE
# Writing output to temporary files as using subprocess.PIPE sometimes
# causes the command to hang when reading output.
# This is very sad.
with contextlib.ExitStack() as ctxs:
stdout_file = None
stderr_file = None
if any(kwargs.get(s, None) for s in ("stdout", "stderr")):
tmpdir = ctxs.enter_context(tempfile.TemporaryDirectory())
if kwargs.get("stdout", None) == subprocess.PIPE:
stdout_file = ctxs.enter_context(
open(
os.path.join(tmpdir, "stdout"), "w+", encoding="utf-8"
)
)
kwargs["stdout"] = stdout_file
if kwargs.get("stderr", None) == subprocess.STDOUT:
kwargs["stderr"] = stdout_file
if kwargs.get("stderr", None) == subprocess.PIPE:
stderr_file = ctxs.enter_context(
open(
os.path.join(tmpdir, "stderr"), "w+", encoding="utf-8"
)
)
kwargs["stderr"] = stderr_file
try:
p: subprocess.CompletedProcess[str] = subprocess.run(cmd, **kwargs)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
if stdout_file:
stdout_file.seek(0)
e.stdout = stdout_file.read()
if stderr_file:
stderr_file.seek(0)
e.stderr = stderr_file.read()
if isinstance(e, subprocess.CalledProcessError):
issue_desc = "failed"
rc = e.returncode
else:
issue_desc = "timed out"
rc = None
if e.stderr:
logger.debug(
"Command %s with exit code %s, stdout:\n%s\nstderr:\n%s",
issue_desc,
rc,
e.stdout.strip("\n"),
e.stderr.strip("\n"),
)
elif e.stdout:
logger.debug(
"Command %s with exit code %s, output:\n%s",
issue_desc,
rc,
e.stdout.strip("\n"),
)
else:
logger.debug("Command %s with exit code %s", issue_desc, rc)
raise
else:
if stdout_file:
stdout_file.seek(0)
p.stdout = stdout_file.read()
if stderr_file:
stderr_file.seek(0)
p.stderr = stderr_file.read()
if log_output:
logger.debug("Command stdout:\n%s", (p.stdout or "").strip("\n"))
logger.debug("Command stderr:\n%s", (p.stderr or "").strip("\n"))
return p
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment