Skip to content

Instantly share code, notes, and snippets.

@mattyclarkson
Created January 7, 2022 12:13
Show Gist options
  • Save mattyclarkson/11c129b972eff71465ec2fe2572b618d to your computer and use it in GitHub Desktop.
Save mattyclarkson/11c129b972eff71465ec2fe2572b618d to your computer and use it in GitHub Desktop.
A Python download script for downloading, verifying and unpacking an archive.
#!/usr/bin/env python3
from __future__ import annotations
import hashlib
import tarfile
from inspect import signature
from logging import getLogger, Logger, basicConfig
from binascii import hexlify
from base64 import standard_b64decode as b64decode
from argparse import (
Action,
ArgumentParser,
RawDescriptionHelpFormatter,
SUPPRESS,
Namespace,
ArgumentError,
)
from sys import exit, argv
from pathlib import Path
from typing import (
Iterable,
Union,
Protocol,
Final,
TypeVar,
Sequence,
Any,
Text,
Optional,
cast,
)
from urllib.parse import urlsplit, SplitResult, urlunsplit
from urllib.request import urlopen
from urllib.error import HTTPError
from dataclasses import dataclass
from tempfile import NamedTemporaryFile, TemporaryDirectory
from shutil import move
class Log(Protocol):
def debug(self, msg: str, *args: object) -> None:
...
def info(self, msg: str, *args: object) -> None:
...
def warning(self, msg: str, *args: object) -> None:
...
def error(self, msg: str, *args: object) -> None:
...
LOG: Final[Log] = getLogger(__name__)
"""
The logger to use for the module
"""
T = TypeVar("T")
class VerbosityAction(Action):
"""Parses :code:`--verbose`/:code:`--quiet` arguments and converts that to logging levels"""
def __init__(
self,
option_strings: Sequence[str],
dest: str,
const: Optional[T] = None,
default: int = 0,
help: Optional[str] = None,
):
super().__init__(
option_strings,
dest,
nargs=0,
const=const,
default=default,
type=None,
choices=None,
required=False,
help=help,
metavar=None,
)
def __call__(
self,
parser: ArgumentParser,
namespace: Namespace,
values: Union[str, Sequence[Any], None],
option_string: Optional[Text] = None,
) -> None:
current = getattr(namespace, self.dest)
if self.const is not None:
if current not in {0, self.const}:
raise ArgumentError(
self,
f"Cannot combine '{option_string}' with other verbosity options.",
)
setattr(namespace, self.dest, self.const)
else:
if current < 0:
raise ArgumentError(
self,
f"Cannot combine '{option_string}' with other verbosity options.",
)
setattr(namespace, self.dest, getattr(namespace, self.dest) + 1)
logger = cast(Logger, LOG)
if namespace.verbosity < 0:
logger.setLevel("ERROR")
elif namespace.verbosity == 0:
logger.setLevel("WARN")
elif namespace.verbosity == 1:
logger.setLevel("INFO")
else:
logger.setLevel("DEBUG")
@dataclass(frozen=True)
class Url:
value: Union[SplitResult, str]
def __post_init__(self) -> None:
if isinstance(self.value, str):
object.__setattr__(self, "value", urlsplit(self.value))
@property
def parts(self) -> SplitResult:
assert not isinstance(self.value, str)
return self.value
@property
def scheme(self) -> str:
return self.parts.scheme
@property
def netloc(self) -> str:
return self.parts.netloc
@property
def path(self) -> str:
return self.parts.path
@property
def query(self) -> str:
return self.parts.query
@property
def fragment(self) -> str:
return self.parts.fragment
def __str__(self) -> str:
return urlunsplit(self.parts)
@dataclass(frozen=True)
class Integrity:
"""
Represents subresource integrity_.
_integrity: https://w3c.github.io/webappsec-subresource-integrity/#integrity-metadata-description
"""
metadata: str
"""The metadata of the subresource integrity of format :code:`<alg>-<digest>[-<opt>]`"""
@property
def algorithm(self) -> str:
return self.metadata.split("-")[0]
@property
def base64(self) -> str:
return self.metadata.split("-")[1]
@property
def digest(self) -> bytes:
return b64decode(self.base64)
@property
def hexdigest(self) -> str:
return hexlify(self.digest).decode()
def __str__(self) -> str:
return self.metadata
def download(
urls: Iterable[Url],
integrity: Integrity,
output: Path,
strip_prefix: Path = Path("."),
cache: Path = Path.home() / ".cache/bazel-toolchain-download",
retries: int = 1,
) -> Path:
if retries < 1:
raise ValueError(f"Invalid {retries} retries value")
# Check if already unpacked
check = f"{strip_prefix}/{integrity}"
try:
with open(output / ".integrity", "r", encoding="utf-8") as ir:
expected = ir.read()
if check != expected:
raise ValueError(
f"Unpacked folder integrity mismatch for {output}, expected {check} got {expected}. Remove {output}"
)
LOG.info("Unpacked integrity matched %s", expected)
return output
except FileNotFoundError:
if output.exists():
raise ValueError(f"Unpacked folder had no integrity file. Remove {output}")
# Download archive to cache
for url in urls:
LOG.info("Downloading %s", url)
for retry in range(retries):
try:
archive = cache / f"{integrity}-{Path(url.path).name}"
# TODO: implement `.zip` unpacking
if archive.suffixes[-2] != ".tar":
raise NotImplementedError(f"No support for {archive.suffixes}")
hasher = hashlib.new(integrity.algorithm)
try:
with open(archive, "rb") as a:
while buffer := a.read(1024):
hasher.update(buffer)
if hasher.digest() != integrity.digest:
archive.unlink()
LOG.warning("Archive cache miss %s", archive)
raise FileNotFoundError("Invalid integrity, redownloading")
LOG.info("Archive cache hit %s", archive)
except FileNotFoundError:
with NamedTemporaryFile("wb") as dst:
with urlopen(f"{url}") as src:
while buffer := src.read(1024):
hasher.update(buffer)
dst.write(buffer)
dst.flush()
if hasher.digest() != integrity.digest:
raise ValueError(
f"File integrity mismatch for {url}, expected {integrity.hexdigest} got {hasher.hexdigest()}"
)
archive.parent.mkdir(parents=True, exist_ok=True)
move(dst.name, archive)
LOG.info("Archive cached %s", archive)
Path(dst.name).touch()
break
except HTTPError as e:
if e.code == 404 or 500 <= e.code < 600:
LOG.debug("Retrying %s HTTP request", e.code)
continue
raise
if archive.exists():
break
LOG.warning("Failed %s", url)
# Unpack archive
LOG.info("Unpacking %s", archive.name)
with TemporaryDirectory() as d:
dir = Path(d)
with tarfile.open(archive, "r:*") as tar:
for tarinfo in tar:
member = Path(tarinfo.name)
if member.is_absolute():
raise ValueError(
"{path} is an invalid absolute filepath in {archive}"
)
if not member.is_relative_to(Path(".")):
raise ValueError(
"{path} is an invalid relative filepath in {archive}"
)
tar.extract(tarinfo, dir)
dir = dir / strip_prefix
with open(dir / ".integrity", "w", encoding="utf-8") as iw:
iw.write(check)
output.parent.mkdir(parents=True, exist_ok=True)
move(dir, output)
return output
def parser(exe: Path) -> ArgumentParser:
"""
Creates the argument parser for the command line interface::
from download import parser, arguments
from pathlib import Path
from sys import argv
prsr = parser(Path(argv[0]))
arguments(prsr)
:param exe: the path to the executable entry point
:returns: a :code:`ArgumentParser` class ready for adding :func:`arguments`
"""
return ArgumentParser(
description="Downloads an archive and unpacks it",
prog=str(exe),
formatter_class=RawDescriptionHelpFormatter,
argument_default=SUPPRESS,
)
def arguments(prsr: ArgumentParser) -> None:
"""
Adds the downloader arguments to an argument parser::
from download import parser, arguments
from pathlib import Path
from sys import argv
prsr = parser()
argument(prsr)
Can add arguments to a subparser if combining the tool into a subcommand parser as part of a suite of CLI tooling.
:param prsr: the parser to add the arguments to
"""
prsr.add_argument(
"-v",
"--verbose",
help="increases the detail in the output",
action=VerbosityAction,
default=0,
dest="verbosity",
)
prsr.add_argument(
"-q",
"--quiet",
action=VerbosityAction,
const=-1,
default=0,
dest="verbosity",
help="only show errors",
)
prsr.add_argument(
"-u",
"--url",
dest="urls",
type=Url,
metavar="URL",
nargs="+",
help="A collection of URLs to attempt to download the archive from",
)
prsr.add_argument(
"-s",
"--strip-prefix",
type=Path,
metavar="PATH",
default=Path("."),
help="The path prefix of the extracted archive to remove",
)
prsr.add_argument(
"-i",
"--integrity",
type=Integrity,
required=True,
help="The subresource integrity for the downloaded archive",
)
prsr.add_argument(
"-o",
"--output",
type=Path,
required=True,
help="The location to unpack the archive",
)
prsr.add_argument(
"-c",
"--cache",
metavar="PATH",
type=Path,
help="A folder location to store downloaded archives",
)
prsr.add_argument(
"-r",
"--retries",
type=int,
default=3,
help="Determines the number of retries for a URL",
)
pass
def main(exe: Path, *args: str) -> int:
"""
Entry point for the command line interface::
from download import main
from sys import argv, exit
if __name__ == "__main__":
exit(main(Path(argv[0]), *argv[1:]))
:param exe: the path to the executable entry point
:param args: command line arguments
:return: the exit code for the executable
"""
prsr = parser(exe)
arguments(prsr)
try:
parsed = prsr.parse_args(args)
except SystemExit as e:
assert e.code is not None
return e.code
try:
print(
download(
**{
k: v
for k, v in vars(parsed).items()
if k in signature(download).parameters
}
)
)
return 0
except KeyboardInterrupt: # coverage: ignore
return 130
def entry() -> None:
"""
A generic entry point for the command line interface.
Reads the command line arguments from :data:`sys.argv`
:raises SystemExit: raised with the return code for the CLI
"""
basicConfig()
exit(main(Path(argv[0]), *argv[1:]))
if __name__ == "__main__":
entry()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment