Skip to content

Instantly share code, notes, and snippets.

@Roffild
Created February 6, 2024 10:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Roffild/ab27f4f8d05649d5fe666748f7d27f8f to your computer and use it in GitHub Desktop.
Save Roffild/ab27f4f8d05649d5fe666748f7d27f8f to your computer and use it in GitHub Desktop.
Utilities for creating a docker image.
"""
Utilities for creating a docker image.
"""
import fnmatch
import json
import pathlib
import re
import stat
import subprocess
import tarfile
import time
def shellCmd(cmd: str, input: str = None) -> (int, str, str):
"""
/bin/sh `cmd` < `input`
:param cmd: shell command
:param input: data
:return: (exitcode, stdout, stderr)
"""
with subprocess.Popen(args=cmd, shell=True, text=True,
stdin=(input and -1), stdout=-1, stderr=-1) as proc:
if input and proc.stdin:
proc._stdin_write(input)
return (proc.wait(), proc.stdout.read(), proc.stderr.read())
def cmdImageBuild(dockerfile_text: str, workdir: str or pathlib.Path="") -> str:
"""
docker image build -f - `workdir` < `dockerfile_text`
:param dockerfile_text: code
:param workdir: workdir for ADD and COPY
:return: ID
"""
workdir = str(workdir or "")
if len(workdir):
workdir = f"f - {workdir}"
(excode, pout, perr) = shellCmd(f"docker image build -{workdir}", dockerfile_text)
if excode == 0:
return pout.rsplit("Successfully built", 1)[-1].strip()
if excode == 1 and "TLS handshake timeout" in perr:
raise ConnectionError("TLS handshake timeout")
print(f"{pout}\nExitCode={excode}\n{perr}")
def cmdImageImport(tar_path, tag: str, change: str or list=None) -> str:
"""
docker image import [--change `change`, ...] `tar_path` [`tag`]
:param tar_path: path of image
:param tag: tag
:param change: one or list of --change
:return: ID
"""
if tag is None:
tag = ""
chng = ""
if change is not None:
if isinstance(change, str):
chng = " ".join("--change '" + c.replace("'", "\\'") + "'" for c in change.splitlines(False))
else:
chng = " ".join("--change '" + c.replace("'", "\\'") + "'" for c in change)
chng = chng.replace("--change ''", "")
(excode, pout, perr) = shellCmd(f"docker image import {chng} {tar_path} {tag}")
if excode == 0:
return pout.split(":", 1)[-1].strip()
print(f"{pout}\nExitCode={excode}\n{perr}")
def dockerImageSaveAnalizer(path: pathlib.Path or str) -> dict:
"""
Returns config and information on files in all layers of the exported image.
:param path: path to image
:return: {"manifest": manifest, "config": config, "layers": layers}
"""
with tarfile.open(path, "r") as img:
tinfo = dict([(_.name, _) for _ in img.getmembers()])
manifest = json.load(img.extractfile(tinfo["manifest.json"]))[0]
config = json.load(img.extractfile(tinfo[manifest["Config"]]))
layers = []
for lrs in manifest["Layers"]:
with tarfile.open(fileobj=img.extractfile(lrs)) as l:
offset = tinfo[lrs].offset_data
members = l.getmembers()
for m in members:
m.offset += offset
m.offset_data += offset
layers.append((json.load(img.extractfile(tinfo[lrs[:-9] + "json"])), members))
return {"manifest": manifest, "config": config, "layers": layers}
def mergeLayers(dst: tarfile.TarFile, src: str, include: tuple or list = ("*",),
exclude: tuple or list = None):
"""
Copying files from all layers of the image to the archive.
Format for `include` and `exclude` = fnmatch. But "*" is added to the beginning if not startswith("/").
:param dst: open tarball
:param src: path to image (for `dockerImageSaveAnalizer(src)`)
:param include: list with fnmatch or re.compile()
:param exclude: list with fnmatch or re.compile()
"""
def _convertFnmatch(match: str) -> re.Pattern:
if not isinstance(match, str):
return match
if not match.startswith(("/", "*")):
match = f"*{match}"
return re.compile(fnmatch.translate(match))
flist = {}
if include and not isinstance(include, (tuple, list)):
include = (include,)
if exclude and not isinstance(exclude, (tuple, list)):
exclude = (exclude,)
include = (include and tuple(map(_convertFnmatch, include))) or tuple()
exclude = (exclude and tuple(map(_convertFnmatch, exclude))) or tuple()
name = ""
exbreak = False
for cfg, lrs in dockerImageSaveAnalizer(src)["layers"]:
for l in lrs:
name = f"/{l.name}"
exbreak = False
for mtch in exclude:
if mtch.match(name):
exbreak = True
break
if exbreak:
continue
for mtch in include:
if mtch.match(name):
flist[l.name] = l
break
with tarfile.open(src, "r") as tin:
for tinfo in flist.values():
dst.addfile(tinfo,
fileobj=(tin.extractfile(tinfo) if not (tinfo.islnk() or tinfo.issym()) else None))
def repairTar(tar_path, force=False) -> list:
"""
Removes duplicates from the tarball that appear after adding to the end of the archive.
Docker cannot unpack such tar.
:param tar_path: path
:param force: repack the archive with sorted paths always.
:return: list of duplicates
"""
duplicate = []
flist = {}
with tarfile.open(tar_path, "r") as ftar:
for m in ftar.getmembers():
if m.name in flist:
duplicate.append(m.name)
flist[m.name] = m
if len(duplicate) > 0 or force:
tmpfile = f"{tar_path}_tempRepairTar"
with tarfile.open(tmpfile, "w") as ftmp:
with tarfile.open(tar_path, "r") as ftar:
for name in sorted(flist.keys()):
tinfo = flist[name]
ftmp.addfile(tinfo, fileobj=(
ftar.extractfile(tinfo) if not (tinfo.islnk() or tinfo.issym()) else None))
pathlib.Path(tmpfile).replace(tar_path)
return duplicate
def dumpImageInfo(tofile: pathlib.Path or str, imageSaveAnalizer: dict):
"""
Dump config and list of files (`ls -l`) from all layers of the image.
:param tofile: path
:param imageSaveAnalizer: return of `dockerImageSaveAnalizer()`
"""
with open(tofile, "w", encoding="utf-8") as out:
out.write("===== MANIFEST =====\n")
json.dump(imageSaveAnalizer["manifest"], out, indent=2)
out.write("\n===== CONFIG =====\n")
json.dump(imageSaveAnalizer["config"], out, indent=2)
out.write("\n===== LAYERS =====\n")
for cfg, lrs in imageSaveAnalizer["layers"]:
out.write(f"LayerID: {cfg['id']}\n")
json.dump(cfg, out, indent=2)
out.write("\n")
lst = []
for l in lrs:
tm = time.gmtime(l.mtime)
if l.uid == 0 and len(l.uname) == 0:
l.uname = "root"
if l.gid == 0 and len(l.gname) == 0:
l.gname = "root"
lst.append((
f"{tm.tm_year}-{tm.tm_mon:0>2d}-{tm.tm_mday:0>2d} " +
f"{tm.tm_hour:0>2d}:{tm.tm_min:0>2d}:{tm.tm_sec:0>2d}",
stat.filemode(l.mode),
f"{l.uname}({l.uid})",
f"{l.gname}({l.gid})",
str(l.size or ""),
"| " + l.name + (f" -> {l.linkname}" if len(l.linkname) else "")
))
if len(lst) == 0:
continue
maxlen = [len(l) for l in lst[0]]
for l in lst:
maxlen = [max(maxlen[x], len(d)) for x, d in enumerate(l)]
for l in lst:
out.write(" ".join(
[(f"{d: <{maxlen[x]}}" if x != 4 else f"{d: >{maxlen[x]}}") for x, d in enumerate(l)]
).strip() + "\n")
out.write("\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment