Skip to content

Instantly share code, notes, and snippets.

@pjaudiomv
Created November 3, 2023 17:28
Show Gist options
  • Save pjaudiomv/8e3398b160858a41dac75a668ffe0c53 to your computer and use it in GitHub Desktop.
Save pjaudiomv/8e3398b160858a41dac75a668ffe0c53 to your computer and use it in GitHub Desktop.
Get All Images in a Cluster
import yaml
import re
from kubernetes import client, config
def parse_name(name):
if len(name.split("/")) > 1:
registry = name.split("/")[0]
if not (
re.search("\.", registry) or re.search("localhost:[0-9]+", registry)
):
name = f"docker.io/{name}"
else:
name = f"docker.io/library/{name}"
return name
def get_pods():
pods = None
try:
pods = CoreV1.list_pod_for_all_namespaces(watch=False)
except Exception as e:
if e.reason in "Not Found":
pods = None
else:
print(e)
if pods is not None:
for i in pods.items:
for s in i.spec.containers:
images.append(parse_name(s.image))
def get_jobs():
jobs = None
try:
jobs = BatchV1.list_job_for_all_namespaces(watch=False)
except Exception as e:
if e.reason in "Not Found":
jobs = None
else:
print(e)
if jobs is not None:
for i in jobs.items:
for s in i.spec.template.spec.containers:
images.append(parse_name(s.image))
def get_cron_jobs():
cronjobs = None
try:
cronjobs = BatchV1.list_cron_job_for_all_namespaces(watch=False)
except Exception as e:
if e.reason in "Not Found":
cronjobs = None
else:
print(e)
if cronjobs is not None:
for i in cronjobs.items:
for s in i.spec.job_template.spec.template.spec.containers:
images.append(parse_name(s.image))
if __name__ == "__main__":
config.load_kube_config()
CoreV1 = client.CoreV1Api()
BatchV1 = client.BatchV1Api()
images = []
get_pods()
get_jobs()
get_cron_jobs()
images = sorted(set(images))
image_dump = {'images': [{"name": f} for f in images]}
print(yaml.dump(image_dump))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment