Skip to content

Instantly share code, notes, and snippets.

@kevdoran
Created April 5, 2023 14:36
Show Gist options
  • Save kevdoran/23d1bf4353f768f6dac4960e77d0f609 to your computer and use it in GitHub Desktop.
Save kevdoran/23d1bf4353f768f6dac4960e77d0f609 to your computer and use it in GitHub Desktop.
A Python 3 script for assisting in verifying Apache NiFi release candidates
#!/usr/bin/env python3
"""A Python 3 script for assisting in verifying Apache release candidates
"""
import argparse
import logging
import os
import subprocess
import sys
logger = logging.getLogger(__name__)
NIFI_KEYS_DEV_URL = "https://dist.apache.org/repos/dist/dev/nifi/KEYS"
NIFI_KEYS_REL_URL = "https://dist.apache.org/repos/dist/release/nifi/KEYS"
class Configuration:
def __init__(self, bypass_cache=None, keys_url=None):
if bypass_cache:
self.bypass_cache = bypass_cache
if keys_url:
self.keys_url = keys_url
class HashUtil:
@staticmethod
def get_expected_hash_from_remote_file(hash_file_url):
try:
wget_out = subprocess.check_output(["wget", "-O-", "-q", hash_file_url])
expected_hash = wget_out.decode('ascii').strip(' \t\n\r')
return expected_hash
except subprocess.CalledProcessError as err:
if err.returncode == 8:
raise FileNotFoundError("File not found at url='{}'".format(hash_file_url))
else:
raise err
@staticmethod
def md5(source_archive):
md5_out = subprocess.check_output(["md5", "-q", source_archive])
md5_hash = md5_out.decode('ascii').strip(' \t\n\r')
return md5_hash
@staticmethod
def sha1(source_archive):
return HashUtil._shasum(source_archive, "1")
@staticmethod
def sha256(source_archive):
return HashUtil._shasum(source_archive, "256")
@staticmethod
def sha512(source_archive):
return HashUtil._shasum(source_archive, "512")
@staticmethod
def _shasum(source_archive, algorithm):
algorithm_arg = "-a{}".format(algorithm)
shasum_out = subprocess.check_output(["shasum", algorithm_arg, source_archive])
sha_hash = shasum_out.decode('ascii').split()[0]
return sha_hash
def verify_signature(source_tarball, source_tarball_url):
logging.info("Verifying signature for {}".format(source_tarball))
# import latest signing keys
subprocess.run(["wget", NIFI_KEYS_DEV_URL])
subprocess.run(["gpg", "--import", "KEYS"])
os.remove('KEYS')
subprocess.run(["wget", NIFI_KEYS_REL_URL])
subprocess.run(["gpg", "--import", "KEYS"])
os.remove('KEYS')
# verify source signature
source_asc_url = source_tarball_url + ".asc"
source_asc = source_tarball + ".asc"
subprocess.run(["wget", source_asc_url])
subprocess.run(["gpg", "--verify", "-v", source_asc])
# cleanup
os.remove(source_asc)
def get_hashes(algorithm, source_archive, source_archive_url):
actual_hash = None
expected_hash_url = None
if algorithm == "md5":
actual_hash = HashUtil.md5(source_archive)
expected_hash_url = source_archive_url + ".md5"
elif algorithm == "sha1":
actual_hash = HashUtil.sha1(source_archive)
expected_hash_url = source_archive_url + ".sha1"
elif algorithm == "sha256":
actual_hash = HashUtil.sha256(source_archive)
expected_hash_url = source_archive_url + ".sha256"
elif algorithm == "sha512":
actual_hash = HashUtil.sha512(source_archive)
expected_hash_url = source_archive_url + ".sha512"
else:
raise Exception("Invalid hash algorithm '{}'".format(algorithm))
expected_hash = None
try:
expected_hash = HashUtil.get_expected_hash_from_remote_file(expected_hash_url)
except FileNotFoundError:
logger.debug("Hash file not found at %s", expected_hash_url)
return actual_hash, expected_hash
def verify_hashes(source_tarball, source_tarball_url):
logging.info("Verifying hashes for {}".format(source_tarball))
for hash_algorithm in ["md5", "sha1", "sha256", "sha512"]:
actual_md5, expected_md5 = get_hashes(hash_algorithm, source_tarball, source_tarball_url)
print("{} hash: {}".format(hash_algorithm, actual_md5))
if expected_md5:
print("Expected: {}".format(expected_md5))
assert expected_md5 == actual_md5
print("Equality Check: PASSED")
else:
print("Remote hash file does not exist for {}. Cannot perform automated check.".format(hash_algorithm))
print("Equality Check: UNKNOWN")
def download_source_tarball(source_tarball_url, bypass_cache=False):
logging.info("Downloading {}".format(source_tarball_url))
source_tarball = source_tarball_url.split("/")[-1]
if os.path.isfile(source_tarball) and not bypass_cache:
logging.info("Found local source tarball. Using local copy without re-downloading")
else:
subprocess.run(["wget", source_tarball_url])
return source_tarball
def verify_rc(source_tarball_url):
source_tarball = download_source_tarball(source_tarball_url)
verify_hashes(source_tarball, source_tarball_url)
verify_signature(source_tarball, source_tarball_url)
def init_logging(verbose=False):
if verbose:
log_level = logging.DEBUG
logging.basicConfig(format="%(asctime)s %(levelname)s:\t%(message)s", level=log_level)
else:
log_level = logging.INFO
logging.basicConfig(format="%(message)s", level=log_level)
logger.debug("Logging initialized.")
def process_args(raw_args=[]):
parser = argparse.ArgumentParser(
description = "Verifies Apache release candidates.",
epilog = "As an alternative to the commandline, params can be placed in a file, one per line, and specified on the commandline like '%(prog)s @params.conf'.",
fromfile_prefix_chars = '@' )
# required, positional arguments
parser.add_argument(
"source_archive_url",
help = "The URL to the source tarball/zip file",
metavar = "SOURCE_ARCHIVE_URL")
# optional, flag arguments
parser.add_argument(
"-v",
"--verbose",
help="increase output verbosity",
action="store_true")
parser.add_argument(
"-n",
"--no-cache",
help="don't use cached archive",
action="store_true")
args = parser.parse_args()
return args
def main(raw_args=[]):
args = process_args(raw_args)
init_logging(args.verbose)
verify_rc(args.source_archive_url)
config = Configuration()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment