Skip to content

Instantly share code, notes, and snippets.

@gallettilance
Created July 7, 2021 13:17
Show Gist options
  • Save gallettilance/72c1dafe324a2582a48ed8fe68df7078 to your computer and use it in GitHub Desktop.
Save gallettilance/72c1dafe324a2582a48ed8fe68df7078 to your computer and use it in GitHub Desktop.
# Requires
# OPM, operator-sdk v1.8.0 or higher
# grpcurl, podman or docker, skopeo, manifest-tool
import os
import re
import time
import json
import socket
import sqlite3
import subprocess
CONTAINER_TOOL="podman"
def run_cmd(cmd, params=None, strict=True, err_msg=None):
if not params:
params = {}
params.setdefault('universal_newlines', True)
params.setdefault('encoding', 'utf-8')
params.setdefault('stderr', subprocess.PIPE)
params.setdefault('stdout', subprocess.PIPE)
response = subprocess.run(cmd, **params)
if strict and response.returncode != 0:
raise RuntimeError(err_msg, " : ", response.stderr)
return response.stdout
def _copy_files_from_image(image, src_path, dest_path):
run_cmd([CONTAINER_TOOL, 'pull', image], err_msg="error pulling image " + image)
container_command = 'unused'
container_id = run_cmd([CONTAINER_TOOL, 'create', image, container_command])
run_cmd([CONTAINER_TOOL, 'cp', f'{container_id.strip()}:{src_path}', dest_path])
def _serve_index_registry(db_path):
# finding available port
sock = socket.socket()
sock.bind(('', 0))
port=sock.getsockname()[1]
sock.close()
cmd = ['opm', 'registry', 'serve', '-p', str(port), '-d', db_path, '-t', '/dev/null']
rpc_proc = subprocess.Popen(cmd)
time.sleep(5) # give OPM the time to serve the content
# query the service to see if it has started
output = run_cmd(['grpcurl', '-plaintext', f'localhost:{port}', 'list', 'api.Registry'])
if 'api.Registry.ListBundles' in output:
return port, rpc_proc
def skopeo_inspect(*args):
cmd = ['skopeo', 'inspect'] + list(args)
output = run_cmd(cmd, err_msg="error skopeo inspecting image ")
return json.loads(output)
def get_image_label(pull_spec, label):
if pull_spec.startswith('docker://'):
full_pull_spec = pull_spec
else:
full_pull_spec = f'docker://{pull_spec}'
return skopeo_inspect(full_pull_spec, '--config').get('config', {}).get('Labels', {}).get(label)
def _get_image_arches(pull_spec):
if pull_spec.startswith('docker://'):
full_pull_spec = pull_spec
else:
full_pull_spec = f'docker://{pull_spec}'
manifests = skopeo_inspect(full_pull_spec, '--raw').get('manifests', {})
return list(map(lambda m : m.get('platform', {}).get('architecture', ""), manifests))
def _get_index_database(from_index, base_dir):
db_path = get_image_label(from_index, 'operators.operatorframework.io.index.database.v1')
_copy_files_from_image(from_index, db_path, base_dir)
return os.path.join(base_dir, os.path.basename(db_path))
def _get_present_bundles(from_index, base_dir):
"""
Get a list of bundles already present in the index image.
:param str from_index: index image to inspect.
:param str base_dir: base directory to create temporary files in.
:return: list of unique present bundles as provided by the grpc query and a list of unique
bundle pull_specs
:rtype: list, list
:raises IIBError: if any of the commands fail.
"""
db_path = _get_index_database(from_index, base_dir)
port, rpc_proc = _serve_index_registry(db_path)
bundles = run_cmd(['grpcurl', '-plaintext', f'localhost:{port}', 'api.Registry/ListBundles'])
rpc_proc.kill()
# If no data is returned there are not bundles present
if not bundles:
return [], []
out_file = open(base_dir + "listBundles.json", "w+")
json.dump(bundles, out_file)
# Transform returned data to parsable json
unique_present_bundles = []
unique_present_bundles_pull_spec = []
present_bundles = [json.loads(bundle) for bundle in re.split(r'(?<=})\n(?={)', bundles)]
for bundle in present_bundles:
bundle_path = bundle['bundlePath']
if bundle_path in unique_present_bundles_pull_spec:
continue
unique_present_bundles.append(bundle)
unique_present_bundles_pull_spec.append(bundle_path)
return unique_present_bundles, db_path
def _rm_property_from_index(db_path, property):
"""
Rm a property from the index
:param str base_dir: the base directory where the database and index.Dockerfile are created
:param str bundle: the bundle path
:param dict property: a dict representing a property to be removed from the index.db
:raises IIBError: if the sql insertion fails
"""
delete = (
'DELETE FROM properties '
'WHERE type=? AND value=? AND operatorbundle_path=?;'
)
con = sqlite3.connect(db_path)
# Insert property
con.execute(delete, (
property["type"],
property["value"],
property["operatorbundle_path"]))
con.commit()
con.close()
# Check if the bundle is using deprecated APIs AND does not have the max OCP version set to 4.8
def _requires_max_ocp_version(bundle):
cmd = [
'operator-sdk',
'bundle',
'validate',
bundle,
'--select-optional',
'name=community',
'--output=json-alpha1',
'-b',
CONTAINER_TOOL
]
result = run_cmd(cmd, strict=False)
if result:
output = json.loads(result)
if output['passed'] == False:
for msg in output['outputs']:
if "olm.maxOpenShiftVersion" in msg['message']:
return True
return False
def get_binary_image(from_index):
binary_image = {
"registry.redhat.io" : "registry.redhat.io/openshift4/ose-operator-registry",
"registry.stage.redhat.io" : "registry.redhat.io/openshift4/ose-operator-registry"
}
registry = from_index.split("/")[0]
tag = from_index.split(":")[1]
if tag != "v4.8":
return binary_image[registry] + ":" + tag
return binary_image[registry] + ":v4.7"
def _generate_dockerfile(db_path, binary_image):
with open('index.Dockerfile', 'w+') as f:
f.write("FROM " + binary_image + "\n")
f.write("LABEL operators.operatorframework.io.index.database.v1=/database/index.db\n")
f.write("ADD " + db_path + " /database/index.db\n")
f.write("EXPOSE 50051\n")
f.write("ENTRYPOINT [\"/bin/opm\"]\n")
f.write("CMD [\"registry\", \"serve\", \"--database\", \"/database/index.db\"]\n")
def _build_index_image_multi_arch(target_image, arches):
with open("manifest.yaml", "w+") as m:
m.write("image: " + target_image + "\n")
m.write("manifests:\n")
shas = {}
for arch in arches:
run_cmd(["buildah", "bud", "--no-cache", "--override-arch", arch, "--iidfile", "iid", "-f", "index.Dockerfile"])
with open("iid", 'r') as f:
lines = f.readlines()
shas[arch] = lines[0]
run_cmd(["buildah", "push", shas[arch], "docker://" + target_image + "-" + arch], err_msg="error pushing arch-specific image")
m.write("- image: " + target_image + "-" + arch + "\n")
m.write(" platform:\n")
m.write(" architecture: " + arch + "\n")
m.write(" os: linux\n")
run_cmd(["manifest-tool", "push", "from-spec", "manifest.yaml"], err_msg="error pushing manifest list - is manifest-list using your docker config?")
def missing_property(bundle):
for prop in bundle["properties"]:
if prop["type"] == "olm.maxOpenShiftVersion":
return False
return True
def rm_properties(from_index, base_dir, target_image):
# Get all bundles in the index
print("Inspecting index image...")
bundles, db_path = _get_present_bundles(from_index, base_dir)
# Rm the max ocp version property
# These will have been added to all bundles that failed the sdk validation
# requiring the olm.maxOpenShiftVersion label be added to the index
for bndl in bundles:
if _requires_max_ocp_version(bndl['bundlePath']):
print("removing property for ", bndl['bundlePath'])
maxOpenShiftVersionProperty = {
"type": "olm.maxOpenShiftVersion",
"value": "4.8",
"operatorbundle_path": bndl['bundlePath'],
}
_rm_property_from_index(db_path, maxOpenShiftVersionProperty)
print("building multi arch image...")
binary_image = get_binary_image(from_index)
_generate_dockerfile(db_path, binary_image)
arches = _get_image_arches(from_index)
_build_index_image_multi_arch(target_image, arches)
# Change this variable for other indexes
INDEX_IMAGE="registry.redhat.io/redhat/redhat-operator-index:v4.8"
BASE_DIR="./"
TARGET_IMAGE="quay.io/galletti94/redhat-operator-index-test:v4.8-rm"
rm_properties(INDEX_IMAGE, BASE_DIR, TARGET_IMAGE)
print("confirming the manifest list was created...")
run_cmd([CONTAINER_TOOL, "pull", TARGET_IMAGE])
arches = _get_image_arches(TARGET_IMAGE)
print("arches in the manifest list ", TARGET_IMAGE, " are: ", arches)
print("Verifying bundles don't have the maxOpenShiftVersion property...")
allBundles, _ = _get_present_bundles(TARGET_IMAGE, "temp/")
bundlePaths = list(map(lambda b : b['bundlePath'], filter(lambda b : not missing_property(b), allBundles)))
print("The following bundles have the maxOpenShiftVersion Property set: ")
print(bundlePaths)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment