Last active
July 11, 2022 15:11
-
-
Save ahal/e167193015e67f4e88c774a36bafeab7 to your computer and use it in GitHub Desktop.
fetch-content
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 -u | |
# This Source Code Form is subject to the terms of the Mozilla Public | |
# License, v. 2.0. If a copy of the MPL was not distributed with this | |
# file, You can obtain one at http://mozilla.org/MPL/2.0/. | |
import argparse | |
import bz2 | |
import concurrent.futures | |
import gzip | |
import hashlib | |
import json | |
import lzma | |
import multiprocessing | |
import os | |
import pathlib | |
import random | |
import subprocess | |
import sys | |
import tempfile | |
import time | |
import urllib.request | |
import zipfile | |
try: | |
import zstandard | |
except ImportError: | |
zstandard = None | |
try: | |
import certifi | |
except ImportError: | |
certifi = None | |
CONCURRENCY = multiprocessing.cpu_count() | |
def log(msg): | |
print(msg, file=sys.stderr) | |
sys.stderr.flush() | |
class IntegrityError(Exception): | |
"""Represents an integrity error when downloading a URL.""" | |
# The following is copied from | |
# https://github.com/mozilla-releng/redo/blob/6d07678a014e0c525e54a860381a165d34db10ff/redo/__init__.py#L15-L85 | |
def retrier(attempts=5, sleeptime=10, max_sleeptime=300, sleepscale=1.5, jitter=1): | |
""" | |
A generator function that sleeps between retries, handles exponential | |
backoff and jitter. The action you are retrying is meant to run after | |
retrier yields. | |
At each iteration, we sleep for sleeptime + random.randint(-jitter, jitter). | |
Afterwards sleeptime is multiplied by sleepscale for the next iteration. | |
Args: | |
attempts (int): maximum number of times to try; defaults to 5 | |
sleeptime (float): how many seconds to sleep between tries; defaults to | |
60s (one minute) | |
max_sleeptime (float): the longest we'll sleep, in seconds; defaults to | |
300s (five minutes) | |
sleepscale (float): how much to multiply the sleep time by each | |
iteration; defaults to 1.5 | |
jitter (int): random jitter to introduce to sleep time each iteration. | |
the amount is chosen at random between [-jitter, +jitter] | |
defaults to 1 | |
Yields: | |
None, a maximum of `attempts` number of times | |
Example: | |
>>> n = 0 | |
>>> for _ in retrier(sleeptime=0, jitter=0): | |
... if n == 3: | |
... # We did the thing! | |
... break | |
... n += 1 | |
>>> n | |
3 | |
>>> n = 0 | |
>>> for _ in retrier(sleeptime=0, jitter=0): | |
... if n == 6: | |
... # We did the thing! | |
... break | |
... n += 1 | |
... else: | |
... print("max tries hit") | |
max tries hit | |
""" | |
jitter = jitter or 0 # py35 barfs on the next line if jitter is None | |
if jitter > sleeptime: | |
# To prevent negative sleep times | |
raise Exception('jitter ({}) must be less than sleep time ({})'.format(jitter, sleeptime)) | |
sleeptime_real = sleeptime | |
for _ in range(attempts): | |
log("attempt %i/%i" % (_ + 1, attempts)) | |
yield sleeptime_real | |
if jitter: | |
sleeptime_real = sleeptime + random.randint(-jitter, jitter) | |
# our jitter should scale along with the sleeptime | |
jitter = int(jitter * sleepscale) | |
else: | |
sleeptime_real = sleeptime | |
sleeptime *= sleepscale | |
if sleeptime_real > max_sleeptime: | |
sleeptime_real = max_sleeptime | |
# Don't need to sleep the last time | |
if _ < attempts - 1: | |
log("sleeping for %.2fs (attempt %i/%i)" % (sleeptime_real, _ + 1, attempts)) | |
time.sleep(sleeptime_real) | |
def stream_download(url, sha256=None, size=None): | |
"""Download a URL to a generator, optionally with content verification. | |
If ``sha256`` or ``size`` are defined, the downloaded URL will be | |
validated against those requirements and ``IntegrityError`` will be | |
raised if expectations do not match. | |
Because verification cannot occur until the file is completely downloaded | |
it is recommended for consumers to not do anything meaningful with the | |
data if content verification is being used. To securely handle retrieved | |
content, it should be streamed to a file or memory and only operated | |
on after the generator is exhausted without raising. | |
""" | |
log('Downloading %s' % url) | |
h = hashlib.sha256() | |
length = 0 | |
t0 = time.time() | |
with urllib.request.urlopen(url, cafile=certifi.where()) if certifi else urllib.request.urlopen(url) as fh: | |
if not url.endswith('.gz') and fh.info().get('Content-Encoding') == 'gzip': | |
fh = gzip.GzipFile(fileobj=fh) | |
while True: | |
chunk = fh.read(65536) | |
if not chunk: | |
break | |
h.update(chunk) | |
length += len(chunk) | |
yield chunk | |
duration = time.time() - t0 | |
digest = h.hexdigest() | |
log('%s resolved to %d bytes with sha256 %s in %.3fs' % ( | |
url, length, digest, duration)) | |
if size: | |
if size == length: | |
log('Verified size of %s' % url) | |
else: | |
raise IntegrityError('size mismatch on %s: wanted %d; got %d' % ( | |
url, size, length)) | |
if sha256: | |
if digest == sha256: | |
log('Verified sha256 integrity of %s' % url) | |
else: | |
raise IntegrityError('sha256 mismatch on %s: wanted %s; got %s' % ( | |
url, sha256, digest)) | |
def download_to_path(url, path, sha256=None, size=None): | |
"""Download a URL to a filesystem path, possibly with verification.""" | |
# We download to a temporary file and rename at the end so there's | |
# no chance of the final file being partially written or containing | |
# bad data. | |
try: | |
path.unlink() | |
except FileNotFoundError: | |
pass | |
for _ in retrier(attempts=5, sleeptime=60): | |
try: | |
tmp = path.with_name('%s.tmp' % path.name) | |
log('Downloading %s to %s' % (url, tmp)) | |
try: | |
with tmp.open('wb') as fh: | |
for chunk in stream_download(url, sha256=sha256, size=size): | |
fh.write(chunk) | |
except IntegrityError: | |
tmp.unlink() | |
raise | |
log('Renaming to %s' % path) | |
tmp.rename(path) | |
return | |
except IntegrityError: | |
raise | |
except Exception as e: | |
log("Download failed: {}".format(e)) | |
continue | |
raise Exception("Download failed, no more retries!") | |
def gpg_verify_path(path: pathlib.Path, public_key_data: bytes, | |
signature_data: bytes): | |
"""Verify that a filesystem path verifies using GPG. | |
Takes a Path defining a file to verify. ``public_key_data`` contains | |
bytes with GPG public key data. ``signature_data`` contains a signed | |
GPG document to use with ``gpg --verify``. | |
""" | |
log('Validating GPG signature of %s' % path) | |
log('GPG key data:\n%s' % public_key_data.decode('ascii')) | |
with tempfile.TemporaryDirectory() as td: | |
try: | |
# --batch since we're running unattended. | |
gpg_args = ['gpg', '--homedir', td, '--batch'] | |
log('Importing GPG key...') | |
subprocess.run(gpg_args + ['--import'], | |
input=public_key_data, | |
check=True) | |
log('Verifying GPG signature...') | |
subprocess.run(gpg_args + ['--verify', '-', '%s' % path], | |
input=signature_data, | |
check=True) | |
log('GPG signature verified!') | |
finally: | |
# There is a race between the agent self-terminating and | |
# shutil.rmtree() from the temporary directory cleanup that can | |
# lead to exceptions. Kill the agent before cleanup to prevent this. | |
env = dict(os.environ) | |
env['GNUPGHOME'] = td | |
subprocess.run(['gpgconf', '--kill', 'gpg-agent'], env=env) | |
def archive_type(path: pathlib.Path): | |
"""Attempt to identify a path as an extractable archive.""" | |
if path.suffixes[-2:-1] == ['.tar']: | |
return 'tar' | |
elif path.suffix == '.zip': | |
return 'zip' | |
else: | |
return None | |
def extract_archive(path, dest_dir, typ): | |
"""Extract an archive to a destination directory.""" | |
# Resolve paths to absolute variants. | |
path = path.resolve() | |
dest_dir = dest_dir.resolve() | |
# We pipe input to the decompressor program so that we can apply | |
# custom decompressors that the program may not know about. | |
if typ == 'tar': | |
if path.suffix == '.bz2': | |
ifh = bz2.open(str(path), 'rb') | |
elif path.suffix == '.gz': | |
ifh = gzip.open(str(path), 'rb') | |
elif path.suffix == '.xz': | |
ifh = lzma.open(str(path), 'rb') | |
elif path.suffix == '.zst': | |
if not zstandard: | |
raise ValueError('zstandard Python package not available') | |
dctx = zstandard.ZstdDecompressor() | |
ifh = dctx.stream_reader(path.open('rb')) | |
elif path.suffix == '.tar': | |
ifh = path.open('rb') | |
else: | |
raise ValueError('unknown archive format for tar file: %s' % path) | |
args = ['tar', 'xf', '-'] | |
pipe_stdin = True | |
elif typ == 'zip': | |
# unzip from stdin has wonky behavior. We don't use a pipe for it. | |
ifh = open(os.devnull, 'rb') | |
with zipfile.ZipFile(str(path)) as archive: | |
archive.extractall(str(dest_dir)) | |
args = None | |
pipe_stdin = False | |
return | |
else: | |
raise ValueError('unknown archive format: %s' % path) | |
log('Extracting %s to %s using %r' % (path, dest_dir, args)) | |
t0 = time.time() | |
with ifh, subprocess.Popen(args, cwd=str(dest_dir), bufsize=0, | |
stdin=subprocess.PIPE) as p: | |
while True: | |
if not pipe_stdin: | |
break | |
chunk = ifh.read(131072) | |
if not chunk: | |
break | |
p.stdin.write(chunk) | |
if p.returncode: | |
raise Exception('%r exited %d' % (args, p.returncode)) | |
log('%s extracted in %.3fs' % (path, time.time() - t0)) | |
def fetch_and_extract(url, dest_dir, extract=True, sha256=None, size=None): | |
"""Fetch a URL and extract it to a destination path. | |
If the downloaded URL is an archive, it is extracted automatically | |
and the archive is deleted. Otherwise the file remains in place in | |
the destination directory. | |
""" | |
basename = url.split('/')[-1] | |
dest_path = dest_dir / basename | |
download_to_path(url, dest_path, sha256=sha256, size=size) | |
if not extract: | |
return | |
typ = archive_type(dest_path) | |
if typ: | |
extract_archive(dest_path, dest_dir, typ) | |
log('Removing %s' % dest_path) | |
dest_path.unlink() | |
def fetch_urls(downloads): | |
"""Fetch URLs pairs to a pathlib.Path.""" | |
with concurrent.futures.ThreadPoolExecutor(CONCURRENCY) as e: | |
fs = [] | |
for download in downloads: | |
fs.append(e.submit(fetch_and_extract, *download)) | |
for f in fs: | |
f.result() | |
def command_static_url(args): | |
gpg_sig_url = args.gpg_sig_url | |
gpg_env_key = args.gpg_key_env | |
if bool(gpg_sig_url) != bool(gpg_env_key): | |
print('--gpg-sig-url and --gpg-key-env must both be defined') | |
return 1 | |
if gpg_sig_url: | |
gpg_signature = b''.join(stream_download(gpg_sig_url)) | |
gpg_key = os.environb[gpg_env_key.encode('ascii')] | |
dest = pathlib.Path(args.dest) | |
dest.parent.mkdir(parents=True, exist_ok=True) | |
try: | |
download_to_path(args.url, dest, sha256=args.sha256, size=args.size) | |
if gpg_sig_url: | |
gpg_verify_path(dest, gpg_key, gpg_signature) | |
except Exception: | |
try: | |
dest.unlink() | |
except FileNotFoundError: | |
pass | |
raise | |
def api(root_url, service, version, path): | |
# taskcluster-lib-urls is not available when this script runs, so | |
# simulate its behavior: | |
if root_url == 'https://taskcluster.net': | |
return 'https://{service}.taskcluster.net/{version}/{path}'.format( | |
service=service, version=version, path=path) | |
return '{root_url}/api/{service}/{version}/{path}'.format( | |
root_url=root_url, service=service, version=version, path=path) | |
def command_task_artifacts(args): | |
fetches = json.loads(os.environ['MOZ_FETCHES']) | |
downloads = [] | |
for fetch in fetches: | |
extdir = pathlib.Path(args.dest) | |
if 'dest' in fetch: | |
extdir = extdir.joinpath(fetch['dest']) | |
extdir.mkdir(parents=True, exist_ok=True) | |
root_url = os.environ['TASKCLUSTER_ROOT_URL'] | |
if fetch['artifact'].startswith('public/'): | |
path = 'task/{task}/artifacts/{artifact}'.format( | |
task=fetch['task'], artifact=fetch['artifact']) | |
url = api(root_url, 'queue', 'v1', path) | |
else: | |
url = ('{proxy_url}/api/queue/v1/task/{task}/artifacts/{artifact}').format( | |
proxy_url=os.environ['TASKCLUSTER_PROXY_URL'], | |
task=fetch['task'], | |
artifact=fetch['artifact']) | |
downloads.append((url, extdir, fetch['extract'])) | |
fetch_urls(downloads) | |
def main(): | |
parser = argparse.ArgumentParser() | |
subparsers = parser.add_subparsers(title='sub commands') | |
url = subparsers.add_parser('static-url', help='Download a static URL') | |
url.set_defaults(func=command_static_url) | |
url.add_argument('--sha256', required=True, | |
help='SHA-256 of downloaded content') | |
url.add_argument('--size', required=True, type=int, | |
help='Size of downloaded content, in bytes') | |
url.add_argument('--gpg-sig-url', | |
help='URL containing signed GPG document validating ' | |
'URL to fetch') | |
url.add_argument('--gpg-key-env', | |
help='Environment variable containing GPG key to validate') | |
url.add_argument('url', help='URL to fetch') | |
url.add_argument('dest', help='Destination path') | |
artifacts = subparsers.add_parser('task-artifacts', | |
help='Fetch task artifacts') | |
artifacts.set_defaults(func=command_task_artifacts) | |
artifacts.add_argument('-d', '--dest', default=os.environ.get('MOZ_FETCHES_DIR'), | |
help='Destination directory which will contain all ' | |
'artifacts (defaults to $MOZ_FETCHES_DIR)') | |
args = parser.parse_args() | |
if not args.dest: | |
parser.error('no destination directory specified, either pass in --dest ' | |
'or set $MOZ_FETCHES_DIR') | |
return args.func(args) | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment