Created
January 7, 2022 12:13
-
-
Save mattyclarkson/11c129b972eff71465ec2fe2572b618d to your computer and use it in GitHub Desktop.
A Python download script for downloading, verifying and unpacking an archive.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from __future__ import annotations | |
import hashlib | |
import tarfile | |
from inspect import signature | |
from logging import getLogger, Logger, basicConfig | |
from binascii import hexlify | |
from base64 import standard_b64decode as b64decode | |
from argparse import ( | |
Action, | |
ArgumentParser, | |
RawDescriptionHelpFormatter, | |
SUPPRESS, | |
Namespace, | |
ArgumentError, | |
) | |
from sys import exit, argv | |
from pathlib import Path | |
from typing import ( | |
Iterable, | |
Union, | |
Protocol, | |
Final, | |
TypeVar, | |
Sequence, | |
Any, | |
Text, | |
Optional, | |
cast, | |
) | |
from urllib.parse import urlsplit, SplitResult, urlunsplit | |
from urllib.request import urlopen | |
from urllib.error import HTTPError | |
from dataclasses import dataclass | |
from tempfile import NamedTemporaryFile, TemporaryDirectory | |
from shutil import move | |
class Log(Protocol): | |
def debug(self, msg: str, *args: object) -> None: | |
... | |
def info(self, msg: str, *args: object) -> None: | |
... | |
def warning(self, msg: str, *args: object) -> None: | |
... | |
def error(self, msg: str, *args: object) -> None: | |
... | |
LOG: Final[Log] = getLogger(__name__) | |
""" | |
The logger to use for the module | |
""" | |
T = TypeVar("T") | |
class VerbosityAction(Action): | |
"""Parses :code:`--verbose`/:code:`--quiet` arguments and converts that to logging levels""" | |
def __init__( | |
self, | |
option_strings: Sequence[str], | |
dest: str, | |
const: Optional[T] = None, | |
default: int = 0, | |
help: Optional[str] = None, | |
): | |
super().__init__( | |
option_strings, | |
dest, | |
nargs=0, | |
const=const, | |
default=default, | |
type=None, | |
choices=None, | |
required=False, | |
help=help, | |
metavar=None, | |
) | |
def __call__( | |
self, | |
parser: ArgumentParser, | |
namespace: Namespace, | |
values: Union[str, Sequence[Any], None], | |
option_string: Optional[Text] = None, | |
) -> None: | |
current = getattr(namespace, self.dest) | |
if self.const is not None: | |
if current not in {0, self.const}: | |
raise ArgumentError( | |
self, | |
f"Cannot combine '{option_string}' with other verbosity options.", | |
) | |
setattr(namespace, self.dest, self.const) | |
else: | |
if current < 0: | |
raise ArgumentError( | |
self, | |
f"Cannot combine '{option_string}' with other verbosity options.", | |
) | |
setattr(namespace, self.dest, getattr(namespace, self.dest) + 1) | |
logger = cast(Logger, LOG) | |
if namespace.verbosity < 0: | |
logger.setLevel("ERROR") | |
elif namespace.verbosity == 0: | |
logger.setLevel("WARN") | |
elif namespace.verbosity == 1: | |
logger.setLevel("INFO") | |
else: | |
logger.setLevel("DEBUG") | |
@dataclass(frozen=True) | |
class Url: | |
value: Union[SplitResult, str] | |
def __post_init__(self) -> None: | |
if isinstance(self.value, str): | |
object.__setattr__(self, "value", urlsplit(self.value)) | |
@property | |
def parts(self) -> SplitResult: | |
assert not isinstance(self.value, str) | |
return self.value | |
@property | |
def scheme(self) -> str: | |
return self.parts.scheme | |
@property | |
def netloc(self) -> str: | |
return self.parts.netloc | |
@property | |
def path(self) -> str: | |
return self.parts.path | |
@property | |
def query(self) -> str: | |
return self.parts.query | |
@property | |
def fragment(self) -> str: | |
return self.parts.fragment | |
def __str__(self) -> str: | |
return urlunsplit(self.parts) | |
@dataclass(frozen=True) | |
class Integrity: | |
""" | |
Represents subresource integrity_. | |
_integrity: https://w3c.github.io/webappsec-subresource-integrity/#integrity-metadata-description | |
""" | |
metadata: str | |
"""The metadata of the subresource integrity of format :code:`<alg>-<digest>[-<opt>]`""" | |
@property | |
def algorithm(self) -> str: | |
return self.metadata.split("-")[0] | |
@property | |
def base64(self) -> str: | |
return self.metadata.split("-")[1] | |
@property | |
def digest(self) -> bytes: | |
return b64decode(self.base64) | |
@property | |
def hexdigest(self) -> str: | |
return hexlify(self.digest).decode() | |
def __str__(self) -> str: | |
return self.metadata | |
def download( | |
urls: Iterable[Url], | |
integrity: Integrity, | |
output: Path, | |
strip_prefix: Path = Path("."), | |
cache: Path = Path.home() / ".cache/bazel-toolchain-download", | |
retries: int = 1, | |
) -> Path: | |
if retries < 1: | |
raise ValueError(f"Invalid {retries} retries value") | |
# Check if already unpacked | |
check = f"{strip_prefix}/{integrity}" | |
try: | |
with open(output / ".integrity", "r", encoding="utf-8") as ir: | |
expected = ir.read() | |
if check != expected: | |
raise ValueError( | |
f"Unpacked folder integrity mismatch for {output}, expected {check} got {expected}. Remove {output}" | |
) | |
LOG.info("Unpacked integrity matched %s", expected) | |
return output | |
except FileNotFoundError: | |
if output.exists(): | |
raise ValueError(f"Unpacked folder had no integrity file. Remove {output}") | |
# Download archive to cache | |
for url in urls: | |
LOG.info("Downloading %s", url) | |
for retry in range(retries): | |
try: | |
archive = cache / f"{integrity}-{Path(url.path).name}" | |
# TODO: implement `.zip` unpacking | |
if archive.suffixes[-2] != ".tar": | |
raise NotImplementedError(f"No support for {archive.suffixes}") | |
hasher = hashlib.new(integrity.algorithm) | |
try: | |
with open(archive, "rb") as a: | |
while buffer := a.read(1024): | |
hasher.update(buffer) | |
if hasher.digest() != integrity.digest: | |
archive.unlink() | |
LOG.warning("Archive cache miss %s", archive) | |
raise FileNotFoundError("Invalid integrity, redownloading") | |
LOG.info("Archive cache hit %s", archive) | |
except FileNotFoundError: | |
with NamedTemporaryFile("wb") as dst: | |
with urlopen(f"{url}") as src: | |
while buffer := src.read(1024): | |
hasher.update(buffer) | |
dst.write(buffer) | |
dst.flush() | |
if hasher.digest() != integrity.digest: | |
raise ValueError( | |
f"File integrity mismatch for {url}, expected {integrity.hexdigest} got {hasher.hexdigest()}" | |
) | |
archive.parent.mkdir(parents=True, exist_ok=True) | |
move(dst.name, archive) | |
LOG.info("Archive cached %s", archive) | |
Path(dst.name).touch() | |
break | |
except HTTPError as e: | |
if e.code == 404 or 500 <= e.code < 600: | |
LOG.debug("Retrying %s HTTP request", e.code) | |
continue | |
raise | |
if archive.exists(): | |
break | |
LOG.warning("Failed %s", url) | |
# Unpack archive | |
LOG.info("Unpacking %s", archive.name) | |
with TemporaryDirectory() as d: | |
dir = Path(d) | |
with tarfile.open(archive, "r:*") as tar: | |
for tarinfo in tar: | |
member = Path(tarinfo.name) | |
if member.is_absolute(): | |
raise ValueError( | |
"{path} is an invalid absolute filepath in {archive}" | |
) | |
if not member.is_relative_to(Path(".")): | |
raise ValueError( | |
"{path} is an invalid relative filepath in {archive}" | |
) | |
tar.extract(tarinfo, dir) | |
dir = dir / strip_prefix | |
with open(dir / ".integrity", "w", encoding="utf-8") as iw: | |
iw.write(check) | |
output.parent.mkdir(parents=True, exist_ok=True) | |
move(dir, output) | |
return output | |
def parser(exe: Path) -> ArgumentParser: | |
""" | |
Creates the argument parser for the command line interface:: | |
from download import parser, arguments | |
from pathlib import Path | |
from sys import argv | |
prsr = parser(Path(argv[0])) | |
arguments(prsr) | |
:param exe: the path to the executable entry point | |
:returns: a :code:`ArgumentParser` class ready for adding :func:`arguments` | |
""" | |
return ArgumentParser( | |
description="Downloads an archive and unpacks it", | |
prog=str(exe), | |
formatter_class=RawDescriptionHelpFormatter, | |
argument_default=SUPPRESS, | |
) | |
def arguments(prsr: ArgumentParser) -> None: | |
""" | |
Adds the downloader arguments to an argument parser:: | |
from download import parser, arguments | |
from pathlib import Path | |
from sys import argv | |
prsr = parser() | |
argument(prsr) | |
Can add arguments to a subparser if combining the tool into a subcommand parser as part of a suite of CLI tooling. | |
:param prsr: the parser to add the arguments to | |
""" | |
prsr.add_argument( | |
"-v", | |
"--verbose", | |
help="increases the detail in the output", | |
action=VerbosityAction, | |
default=0, | |
dest="verbosity", | |
) | |
prsr.add_argument( | |
"-q", | |
"--quiet", | |
action=VerbosityAction, | |
const=-1, | |
default=0, | |
dest="verbosity", | |
help="only show errors", | |
) | |
prsr.add_argument( | |
"-u", | |
"--url", | |
dest="urls", | |
type=Url, | |
metavar="URL", | |
nargs="+", | |
help="A collection of URLs to attempt to download the archive from", | |
) | |
prsr.add_argument( | |
"-s", | |
"--strip-prefix", | |
type=Path, | |
metavar="PATH", | |
default=Path("."), | |
help="The path prefix of the extracted archive to remove", | |
) | |
prsr.add_argument( | |
"-i", | |
"--integrity", | |
type=Integrity, | |
required=True, | |
help="The subresource integrity for the downloaded archive", | |
) | |
prsr.add_argument( | |
"-o", | |
"--output", | |
type=Path, | |
required=True, | |
help="The location to unpack the archive", | |
) | |
prsr.add_argument( | |
"-c", | |
"--cache", | |
metavar="PATH", | |
type=Path, | |
help="A folder location to store downloaded archives", | |
) | |
prsr.add_argument( | |
"-r", | |
"--retries", | |
type=int, | |
default=3, | |
help="Determines the number of retries for a URL", | |
) | |
pass | |
def main(exe: Path, *args: str) -> int: | |
""" | |
Entry point for the command line interface:: | |
from download import main | |
from sys import argv, exit | |
if __name__ == "__main__": | |
exit(main(Path(argv[0]), *argv[1:])) | |
:param exe: the path to the executable entry point | |
:param args: command line arguments | |
:return: the exit code for the executable | |
""" | |
prsr = parser(exe) | |
arguments(prsr) | |
try: | |
parsed = prsr.parse_args(args) | |
except SystemExit as e: | |
assert e.code is not None | |
return e.code | |
try: | |
print( | |
download( | |
**{ | |
k: v | |
for k, v in vars(parsed).items() | |
if k in signature(download).parameters | |
} | |
) | |
) | |
return 0 | |
except KeyboardInterrupt: # coverage: ignore | |
return 130 | |
def entry() -> None: | |
""" | |
A generic entry point for the command line interface. | |
Reads the command line arguments from :data:`sys.argv` | |
:raises SystemExit: raised with the return code for the CLI | |
""" | |
basicConfig() | |
exit(main(Path(argv[0]), *argv[1:])) | |
if __name__ == "__main__": | |
entry() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment