Skip to content

Instantly share code, notes, and snippets.

@smoser
Created June 28, 2022 18:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save smoser/492356f8f6863aedbcd84d965f889e7e to your computer and use it in GitHub Desktop.
Save smoser/492356f8f6863aedbcd84d965f889e7e to your computer and use it in GitHub Desktop.
subprocess python wrapper named 'subp'.

subp - a python subprocess wrapper

This is just a wrapper around python subprocess that I like to use. It originated during cloud-init development.

Some things that I like about it:

  • SubpResult prints well.
  • times how long subpocesses take, that is available in the SubpResult.
  • takes a timeout and a signal to send to the subprocess. The python standard library will only ever send SIGKILL which does not give the process time to cleanup.
  • capture - capture stdin and stdout.
  • It can be used with or without exception raising. If you pass rcs=None, no exceptions will be raised, you can do your own checking the returned 'rc'

Some things to note:

  • If you want to use 'sh' to split up your command, rather than 'shell=True' as you'd pass to subprocces.Popen, you can use ('sh', '-c', your_string)
import logging
import shlex
import signal
import subprocess
import threading
import time
class SubpResult():
cmd, data, duration, exception = (None, None, None, None)
out, err = (b'', b'')
rc = -1
def __init__(self, **kwargs):
bad = ','.join([k for k in kwargs if not hasattr(self, k)])
if bad:
raise AttributeError(
f"{self.__class__.__name__} object has no attributes: {bad}")
for k, v in kwargs.items():
setattr(self, k, v)
def __str__(self):
def indent(name, data):
if data is None:
return "{name}: {data}"
if hasattr(data, 'decode'):
data = data.decode('utf-8', errors='ignore')
return "%s: %s" % (
name,
''.join([" " + line + "\n" for line in data.splitlines()]))
return '\n'.join([
"cmd: %s" % '' if not self.cmd else shell_quote(self.cmd),
f"rc: {self.rc}",
"duration: %.2fs" % self.duration,
"exc: %s" % self.exception,
indent("stdout", self.out),
indent("stderr", self.err),
""])
class SubpError(Exception):
def __init__(self, result, desc=None):
super(SubpError, self).__init__(desc)
self.desc = desc
self.result = result
def __str__(self):
ret = "" if not self.desc else "desc: %s\n" % self.desc
ret += "" if not self.result.exception else "exception: %s\n" % self.result.exception
return ret + str(self.result)
def shell_quote(cmd):
return ' '.join(shlex.quote(a) for a in cmd)
def subp(cmd, capture=True, data=None, rcs=(0,), timeout=None, killsig=signal.SIGTERM,
logger=None):
"""Execute a subprocess.
cmd: string or list to execute. string is not split.
capture: capture and return stdin and stdout in the SubpResult
data: stdin to the process.
rcs: list of exit codes that should not raise exception.
timeout: maximum time in seconds for process to finish.
killsig: the signal to send to the process. Standard Popen interface
always sends SIGKILL, which does not allow process to clean up.
Return is a SubpResult.
If rcs is provided and exit code is not in the list of rcs, then
a SubpError is raised.
"""
if isinstance(rcs, int):
rcs = (rcs,)
# allow user to pass in a string as data
if hasattr(data, 'encode'):
data = data.encode('utf-8')
devnull_fp = None
stdin = subprocess.PIPE
stdout, stderr = (None, None)
if capture:
stdout, stderr = (subprocess.PIPE, subprocess.PIPE)
result = SubpResult(cmd=cmd, rc=-1)
start = time.time()
try:
if data is None:
# devnull ensures any read gets null and wont ever block.
devnull_fp = open(os.devnull, encoding="utf-8")
stdin = devnull_fp
if logger is not None:
logger("Executing: %s", shell_quote(cmd))
sp = subprocess.Popen(
cmd, stdout=stdout, stderr=stderr, stdin=stdin)
def communicate():
try:
(result.out, result.err) = sp.communicate(data)
except Exception as e:
result.exception = e
if timeout is not None and killsig != signal.SIGKILL:
thread = threading.Thread(target=communicate)
thread.start()
thread.join(timeout)
if thread.is_alive():
sp.send_signal(killsig)
thread.join()
result.exception = subprocess.TimeoutExpired(cmd=cmd, timeout=timeout)
else:
communicate()
result.rc = sp.returncode
if not result.out:
result.out = b''
if not result.err:
result.err = b''
finally:
result.duration = time.time() - start
if devnull_fp:
devnull_fp.close()
if logger is not None:
logger("returned %d took %.2fs", result.rc, result.duration)
if rcs is None or result.rc in rcs:
return result
raise SubpError(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment