Skip to content

Instantly share code, notes, and snippets.

@arxcruz
Created October 6, 2020 15:41
Show Gist options
  • Save arxcruz/08c50464712a8730388a9a6f59af2489 to your computer and use it in GitHub Desktop.
Save arxcruz/08c50464712a8730388a9a6f59af2489 to your computer and use it in GitHub Desktop.
from podman import ApiConnection, system, images
from base64 import b64encode
import json
import re
import requests
podman_uri = "unix://localhost/run/user/1000/podman/podman.sock"
# zuul_api = "https://review.rdoproject.org/zuul/api/builds?job_name={}"
zuul_api = "https://zuul.openstack.org/api/builds?job_name="
def prune(api, **kwargs):
try:
response = api.request("POST", api.join("/images/prune", kwargs))
return json.loads(response.read())
except Exception as e:
print("error: {}".format(e))
def pull(api, image, **kwargs):
try:
kwargs["reference"] = "trunk.registry.rdoproject.org/tripleomaster/{}".format(image)
# user_pass = "{}:{}".format(username, token) # .encode("ascii")
# user_pass = b64encode(user_pass).decode("ascii")
# kwargs["credentials"] = user_pass
response = api.request("POST", api.join("/images/pull", kwargs))
return json.loads(response.read())
except Exception as e:
print("error: {}".format(e))
def remove(api, image, **kwargs):
try:
url = "/images/{}".format(api.quote(image))
print(api.join(url))
response = api.request("DELETE", url)
return json.loads(response.read())
except Exception as e:
print("error: {}".format(e))
def get_jobs_url(job_name):
zuul_job_url = "{}{}".format(zuul_api, job_name)
resp = requests.get(zuul_job_url)
if resp.status_code == 200:
return [r["log_url"] for r in resp.json()]
return []
def read_containers_built(url):
built_log = "{}{}".format(url, "logs/containers-successfully-built.log")
response = requests.get(built_log)
result = []
exp = r"([a-zA-Z-]+)\s+(\w+)[\w\s]+"
if response.status_code == 200:
lines = response.text.split('\n')
for line in lines:
res = re.search(exp, line)
if res:
result.append(res.group(1, 2))
return result
if __name__ == "__main__":
job_name = "tripleo-build-containers-centos-8"
print("Downloading containers log file")
url = get_jobs_url(job_name)[0]
print("Reading containers")
containers = read_containers_built(url)
print("Container sample: {}".format(containers[0]))
with ApiConnection(podman_uri) as api:
images_list = images.list_images(api)
for image in images_list:
full_name = image["RepoTags"][0]
if('tripleo' in full_name):
name = full_name[full_name.rfind('/')+1:]
print("Removing image {}".format(name))
remove(api, name)
print("Pulling")
pull(api, 'centos-binary-aodh-api:current-tripleo')
# for container in containers:
# container_name = "{}:{}".format(container[0], container[1])
# print("Pulling container {}".format(container_name))
# pull(api, container_name)
# print(images.list_images(api))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment