Skip to content

Instantly share code, notes, and snippets.

@fboender
Created July 2, 2021 07:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fboender/cdb4c1afae00e4a032b9611d10f8ba1e to your computer and use it in GitHub Desktop.
Save fboender/cdb4c1afae00e4a032b9611d10f8ba1e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import sys
import os
import subprocess
import json
class CmdError(Exception):
def __init__(self, msg, exitcode, stderr):
self.exitcode = exitcode
self.stderr = stderr
super().__init__(msg)
def cmd(cmd, input=None, env=None, raise_err=True, env_clean=False,
auto_decode=True):
"""
Run command `cmd` in a shell.
`input` (string) is passed in the process' STDIN.
If `env` (dict) is given, the environment is updated with it. If
`env_clean` is `True`, the subprocess will start with a clean environment
and not inherit the current process' environment. `env` is still applied.
If `raise_err` is `True` (default), a `CmdError` is raised when the return
code is not zero.
Returns a dictionary:
{
'stdout': <string>,
'stderr': <string>,
'exitcode': <int>
}
If `auto_decode` is True, both `stdout` and `stderr` are automatically
decoded from the system default encoding to unicode strings. This will fail
if the output is not in that encoding (e.g. it contains binary data).
Otherwise, stdout and stderr are of type `<bytes>`.
"""
p_env = {}
if env_clean is False:
p_env.update(os.environ)
if env is not None:
p_env.update(env)
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=p_env)
stdout, stderr = p.communicate(input)
if auto_decode is True:
stdout = stdout.decode(sys.getdefaultencoding())
stderr = stderr.decode(sys.getdefaultencoding())
if p.returncode != 0 and raise_err is True:
msg = "Command '{}' returned with exit code {}".format(cmd,
p.returncode)
raise CmdError(msg, p.returncode, stderr)
return {
'stdout': stdout,
'stderr': stderr,
'exitcode': p.returncode
}
def docker_get_container_ids():
result = cmd("docker ps -a --format '{{.ID}}'")
return [line.strip() for line in result["stdout"].splitlines()]
def docker_get_inspect(container_ids):
result = cmd("docker inspect {}".format(container_ids))
return result["stdout"]
def json_flatten(doc, prefix=""):
if isinstance(doc, list):
for i, v in enumerate(doc):
json_flatten(v, "{}[{}]".format(prefix, i))
elif isinstance(doc, dict):
for k, v in doc.items():
json_flatten(v, "{}.{}".format(prefix, k))
else:
sys.stdout.write("{} = {}\n".format(prefix.lstrip(".").lower(), doc))
if __name__ == "__main__":
container_ids = " ".join(docker_get_container_ids())
container_info = json.loads(docker_get_inspect(container_ids))
for container in container_info:
json_flatten(container, "{}:{}".format(container["Id"][:12], container["Name"][1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment