Skip to content

Instantly share code, notes, and snippets.

@slhck
Last active January 9, 2023 19:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slhck/ff4c289753359f7ce211ef1c210b4b86 to your computer and use it in GitHub Desktop.
Save slhck/ff4c289753359f7ce211ef1c210b4b86 to your computer and use it in GitHub Desktop.
Docker Registry Cleanup
#!/usr/bin/env python3
#
# Cleanup Docker tags from GitLab registry
#
# Requirements:
#
# - skopeo
# - Python 3.9 or higher
# - Pip packages: `pip3 install loguru joblib packaging`
#
# Copyright (c) Werner Robitza
# License: MIT
import argparse
import json
import shlex
import subprocess
import sys
from multiprocessing import cpu_count
from typing import Literal
from joblib import Parallel, delayed
from loguru import logger
from packaging import version
EmptyCustomer = Literal["<EMPTY>"]
def list_tags(repo: str, registry: str, include_latest: bool = False) -> list[str]:
"""
List tags using skopeo. The skopeo output will be valid JSON, extract the "Tags" property.
Args:
repo (str): The repository to list tags for
registry (str): The registry to list tags for
include_latest (bool, optional): If True, include the "latest" tag. Defaults to False.
Returns:
list[str]: A list of tags
"""
try:
tags = json.loads(
subprocess.check_output(
["skopeo", "list-tags", f"docker://{registry}/{repo}"]
)
)["Tags"]
except subprocess.CalledProcessError as e:
logger.error(f"error listing tags for {repo}: {e}")
return []
if not include_latest:
tags = [tag for tag in tags if not tag.endswith("latest")]
return tags
def delete_tag(repo: str, registry: str, tag: str, dry_run: bool = False):
"""
Delete a tag using skopeo.
Args:
repo (str): The repository to delete the tag from
registry (str): The registry to delete the tag from
tag (str): The tag itself
dry_run (bool, optional): If True, don't actually delete anything. Defaults to False.
"""
cmd = ["skopeo", "delete", f"docker://{registry}/{repo}:{tag}"]
logger.info({shlex.join(cmd)})
if not dry_run:
subprocess.check_call(cmd)
def cleanup_repo(
repo: str, registry: str, tags_to_keep: int = 5, dry_run: bool = False
):
"""
Cleanup a repository by removing all tags except the latest tags_to_keep.
Args:
repo: The repository to cleanup
registry: The registry to cleanup (e.g. registry.example.com)
tags_to_keep: The number of tags to keep (default: 5)
dry_run: If True, don't actually delete anything (default: False)
"""
logger.info(f"{repo} -- starting cleanup")
tags = list_tags(repo, registry, include_latest=False)
logger.info(f"{repo} -- found {len(tags)} tags")
logger.debug(f"{repo} -- tags: {tags}")
# tags may include a customer prefix like:
# ["customer1-v1.6.2",
# "customer1-v1.6.3",
# "customer1-v1.6.4",
# "customer2-v1.5.2",
# "customer2-v1.6.1",
# "customer2-v1.6.2",
# "v1.6.2",
# "v1.6.1"]
# get the unique list of customers from the tags
prefixes: set[str | EmptyCustomer] = set()
for tag in tags:
if "-" in tag:
prefixes.add(tag.split("-")[0])
else:
# special one
prefixes.add("<EMPTY>")
for prefix in prefixes:
if prefix == "<EMPTY>":
per_prefix_tags = [tag for tag in tags if "-" not in tag]
else:
per_prefix_tags = [
tag.split("-")[1] for tag in tags if tag.startswith(prefix)
]
# Sort tags by version using the parse_version function
per_prefix_tags_sorted = sorted(
per_prefix_tags, key=version.parse, reverse=True
)
# Remove tags_to_keep tags
tags_to_remove = list(reversed(per_prefix_tags_sorted[tags_to_keep:]))
tags_kept = per_prefix_tags_sorted[:tags_to_keep]
logger.debug(f"{repo} -- prefix {prefix}, removing tags: {tags_to_remove}")
logger.debug(f"{repo} -- prefix {prefix}, keeping tags: {tags_kept}")
# Remove tags
logger.info(f"{repo} -- prefix {prefix}, removing {len(tags_to_remove)} tags")
for tag in tags_to_remove:
try:
if prefix == "<EMPTY>":
delete_tag(repo, registry, tag, dry_run=dry_run)
else:
delete_tag(repo, registry, f"{prefix}-{tag}", dry_run=dry_run)
except subprocess.CalledProcessError as e:
logger.warning(
f"{repo} -- prefix {prefix}, failed to remove tag {tag}: {e}"
)
def check_skopeo_login(registry: str):
"""
Check if skopeo is logged in
Args:
registry (str): The registry to check login for
"""
subprocess.check_call(
["skopeo", "login", registry],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
)
def main():
parser = argparse.ArgumentParser(
prog="docker-registry-cleanup",
description="Cleanup docker registry",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--registry", default="registry.example.com")
parser.add_argument(
"-r",
"--repositories",
nargs="+",
)
parser.add_argument(
"--repositories-file",
help="File containing a list of repositories to cleanup",
)
parser.add_argument("-t", "--tags-to-keep", default=5, type=int)
parser.add_argument(
"--sequential", action="store_true", help="Run sequentially instead of parallel"
)
parser.add_argument("-n", "--dry-run", action="store_true")
parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args()
logger.remove()
log_fmt = "<level>{level: <8}</level> {message}"
if args.verbose:
logger.add(sys.stdout, format=log_fmt, level="DEBUG")
else:
logger.add(sys.stdout, format=log_fmt, level="INFO")
if args.repositories_file:
with open(args.repositories_file) as f:
args.repositories = [
line.strip() for line in f.readlines() if not line.startswith("#")
]
if not args.repositories:
logger.error("No repositories specified")
sys.exit(1)
logger.info("Checking registry login")
check_skopeo_login(args.registry)
logger.info("Cleanup starting")
if args.sequential:
for repo in args.repositories:
cleanup_repo(
repo,
args.registry,
tags_to_keep=args.tags_to_keep,
dry_run=args.dry_run,
)
else:
Parallel(n_jobs=cpu_count(), backend="multiprocessing")(
delayed(cleanup_repo)(
repo,
args.registry,
tags_to_keep=args.tags_to_keep,
dry_run=args.dry_run,
)
for repo in args.repositories
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment