Skip to content

Instantly share code, notes, and snippets.

@LewisGaul
Last active January 31, 2021 18:24
Show Gist options
  • Save LewisGaul/65aa7d4ff5a20abb1c8ea02cd0948486 to your computer and use it in GitHub Desktop.
Save LewisGaul/65aa7d4ff5a20abb1c8ea02cd0948486 to your computer and use it in GitHub Desktop.
Fetch files from GitHub using the REST API, emulating 'git archive', which is not supported natively by GitHub.
#!/usr/bin/env python3
"""
Fetch files from GitHub using the REST API, emulating 'git archive', which is
not supported natively by GitHub.
Supports Python 3.6+, Linux. Only external dependency is the 'curl' executable.
API docs are at https://docs.github.com/en/rest, we use the following:
- Get repo info
GET /repos/{owner}/{repo}
https://docs.github.com/en/rest/reference/repos#get-a-repository
- Get path info
GET /repos/{owner}/{repo}/contents/{path}
https://docs.github.com/en/rest/reference/repos#get-repository-content
- Get a 'tree' (list directory contents)
GET /repos/{owner}/{repo}/git/trees/{tree_sha}
GET /repos/{owner}/{repo}/git/trees/{ref}:{path}
https://docs.github.com/en/rest/reference/git#get-a-tree
- Get a 'blob' (file contents)
GET /repos/{owner}/{repo}/git/blobs/{file_sha}
GET /repos/{owner}/{repo}/git/blobs/{ref}:{path}
https://docs.github.com/en/rest/reference/git#get-a-blob
Limitations:
- Does not set mode of directories, only files.
- Does not set mtime of files/dirs.
"""
import argparse
import asyncio
import base64
import collections
import json
import logging
import os
import posixpath
import shlex
import subprocess
import sys
import tarfile
import tempfile
import urllib.parse
import zipfile
from typing import Coroutine, Dict, List, Mapping, Optional, Tuple, Union
logger = logging.getLogger("gh-archive")
Json = Union[str, int, bool, None, Dict[str, "Json"], List["Json"]]
JsonContainer = Union[Dict[str, Json], List[Json]]
BASE_URL: str
REPO_IDENT: str
USER: str
TOKEN: Optional[str]
File = collections.namedtuple("File", "path, sha, mode, mtime, contents")
# ------------------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------------------
async def _subproc(cmd, **kwargs) -> str:
"""
Run a subprocess command, exiting on failure (similar to bash 'set -e'
behaviour).
:param cmd:
Command to run, in list form.
:return:
Stdout from the command.
"""
cmd_str = " ".join(shlex.quote(x) for x in cmd)
logger.debug("Running command: %s", cmd_str)
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
return stdout.decode()
else:
logger.critical("Command failed: %s\n%s", cmd_str, stderr.decode())
sys.exit(proc.returncode)
async def _run_curl(url: str) -> str:
cmd = ["curl", "-L", "--fail", url]
if TOKEN:
cmd.extend(("--user", f"{USER}:{TOKEN}"))
return await _subproc(cmd)
def _parse_file_mode(mode: str) -> int:
return int(mode, base=8) % 0o1000
def _decode_file_contents(content: str) -> str:
return base64.b64decode(content).decode()
async def _fetch_repo_info() -> JsonContainer:
"""
GET /repos/{owner}/{repo}
https://docs.github.com/en/rest/reference/repos#get-a-repository
:return:
Decoded JSON from the Github API.
"""
url = posixpath.join(
BASE_URL,
"repos",
REPO_IDENT,
).rstrip("/")
output = await _run_curl(url)
return json.loads(output)
async def _fetch_path_info(path: str, ref: Optional[str] = None) -> JsonContainer:
"""
GET /repos/{owner}/{repo}/contents/{path}
https://docs.github.com/en/rest/reference/repos#get-repository-content
:param path:
The repo path to get info for.
:param ref:
Optionally specify a branch/tag/commit.
:return:
Decoded JSON from the Github API.
"""
url = posixpath.join(BASE_URL, "repos", REPO_IDENT, "contents", path).rstrip("/")
if ref:
url += "?ref=" + ref
output = await _run_curl(url)
return json.loads(output)
async def _fetch_tree_info(
path: Optional[str] = None, ref: Optional[str] = None, *, sha: Optional[str] = None
) -> JsonContainer:
"""
GET /repos/{owner}/{repo}/git/trees/{tree_sha}
GET /repos/{owner}/{repo}/git/trees/{ref}:{path}
https://docs.github.com/en/rest/reference/git#get-a-tree
:param path:
The repo path to get info for.
:param ref:
Specify the branch/tag/commit.
:param sha:
If given, path and ref should not be given.
:return:
Decoded JSON from the Github API.
"""
if (
# fmt: off
sha and not (path is None and not ref)
or not sha and (path is None or not ref)
# fmt: on
):
raise ValueError("Expected either sha OR both path and ref")
if sha:
treeish = sha
else:
treeish = f"{ref}:{urllib.parse.quote(path, safe='')}"
url = posixpath.join(BASE_URL, "repos", REPO_IDENT, "git", "trees", treeish).rstrip(
"/"
)
output = await _run_curl(url)
return json.loads(output)
async def _fetch_blob_info(
path: Optional[str] = None, ref: Optional[str] = None, *, sha: Optional[str] = None
) -> JsonContainer:
"""
GET /repos/{owner}/{repo}/git/blobs/{blob_sha}
GET /repos/{owner}/{repo}/git/blobs/{ref}:{path}
https://docs.github.com/en/rest/reference/git#get-a-blob
:param path:
The repo path to get info for.
:param ref:
Specify the branch/tag/commit.
:param sha:
If given, path and ref should not be given.
:return:
Decoded JSON from the Github API.
"""
if (
# fmt: off
sha and not (path is None and not ref)
or not sha and (path is None or not ref)
# fmt: on
):
raise ValueError("Expected either sha OR both path and ref")
if sha:
blobish = sha
else:
blobish = f"{ref}:{urllib.parse.quote(path, safe='')}"
url = posixpath.join(BASE_URL, "repos", REPO_IDENT, "git", "blobs", blobish).rstrip(
"/"
)
output = await _run_curl(url)
return json.loads(output)
# ------------------------------------------------------------------------------
# Main logic
# ------------------------------------------------------------------------------
def convert_repo_web_url(repo_url: str) -> Tuple[str, str]:
"""
Convert a web browser repo URL to the base of an API URL and repo name.
This includes handling for public (github.com) and enterprise URLs.
:param repo_url:
The repo URL to convert.
Examples:
- "https://github.com/{owner}/{repo}"
- "https://private-enterprise-domain.com/{org}/{repo}"
:return:
The API base URL and org/owner + repo name segment.
Examples:
- ("https://api.github.com", "{owner}/{repo}")
- ("https://private-enterprise-domain.com/api/v3", "{org}/{repo}")
"""
scheme, netloc, repo_ident, *_ = urllib.parse.urlsplit(repo_url)
if netloc == "github.com":
netloc = f"api.{netloc}"
url_path = ""
else:
url_path = "api/v3"
return (
urllib.parse.urlunsplit((scheme, netloc, url_path, "", "")),
repo_ident.strip("/"),
)
async def fetch_path_contents(path: str, ref: str) -> List[File]:
"""
Fetch all files and their content under the given path (file or dir).
:param path:
The path to fetch from.
:param ref:
The branch/tag/commit to fetch from.
:return:
A list of file tuples containing path, mode and contents.
"""
# First check what the given path is (dir/file).
info = await _fetch_path_info(path, ref)
if isinstance(info, list): # dir
logger.info("Recursing into directory %r", path)
fetch_tree_tasks = {path: _fetch_tree_info(path, ref)}
files = []
while fetch_tree_tasks:
base_paths = list(fetch_tree_tasks.keys())
tree_results = await asyncio.gather(*fetch_tree_tasks.values())
fetch_tree_tasks = dict()
for base_path, tree in zip(base_paths, tree_results):
if tree["truncated"]:
raise RuntimeError("Github API returned a truncated result")
for info in tree["tree"]:
full_path = os.path.join(base_path, info["path"])
if info["type"] == "tree":
fetch_tree_tasks[full_path] = _fetch_tree_info(sha=info["sha"])
else:
files.append(
File(
full_path,
info["sha"],
_parse_file_mode(info["mode"]),
None,
None,
)
)
blob_results = await asyncio.gather(
*(_fetch_blob_info(sha=f.sha) for f in files)
)
for i, (file, blob) in enumerate(zip(files, blob_results)):
files[i] = File(*file[:-1], _decode_file_contents(blob["content"]))
else: # file
logger.info("Fetching file %r", path)
# Get the file's mode by fetching the tree info.
parent_tree = (await _fetch_tree_info(posixpath.dirname(path), ref))["tree"]
tree_file_info = {x["path"]: x for x in parent_tree}[posixpath.basename(path)]
files = [
File(
path,
info["sha"],
_parse_file_mode(tree_file_info["mode"]),
None,
_decode_file_contents(info["content"]),
)
]
return files
def write_files(files: List[File], *, dest: str = "./", fmt: str = "tgz") -> None:
def create_files(base_path: str):
for file in files:
logger.debug("Creating file: %s", file[:-1])
fullpath = os.path.join(base_path, file.path)
os.makedirs(os.path.dirname(fullpath), exist_ok=True)
with open(fullpath, "w") as f:
f.write(file.contents)
os.chmod(fullpath, file.mode)
if fmt == "plain":
logger.info("Writing files under %s", dest)
create_files(dest)
return
if os.path.isdir(dest):
dest = os.path.join(dest, f"archive.{fmt}")
logger.info("Writing files and creating archive at %s", dest)
with tempfile.TemporaryDirectory() as tmpdir:
create_files(tmpdir)
if fmt in ("tar", "tgz"):
fmt_code = "gz" if fmt == "tgz" else ""
with tarfile.open(dest, f"w:{fmt_code}") as tf:
tf.add(tmpdir, arcname=".")
elif fmt == "zip":
with zipfile.ZipFile(dest, "w") as zf:
zf.write(tmpdir, arcname=".")
else:
raise ValueError(f"Unsupported archive format {fmt!r}")
def parse_args(argv):
parser = argparse.ArgumentParser()
parser.add_argument("repo_url", help="URL to the repo (http or https)")
parser.add_argument("path", help="Path in the repo to archive")
parser.add_argument("--ref", help="Ref-point to archive from (e.g. branch/commit)")
parser.add_argument(
"--user",
help="GitHub username if auth is required - can also use GH_USER env var, "
"defaults to USER env var",
)
parser.add_argument(
"--token",
help="GitHub API token if auth is required - can also use GH_TOKEN env var",
)
parser.add_argument(
"--output",
"-o",
default="./",
help="Output path for the archive, defaults to cwd",
)
parser.add_argument(
"--format",
"-f",
choices=["tar", "tgz", "zip", "plain"],
help="The format to save the archive in, inferred from the output filename "
"if possible, otherwise defaults to tgz",
)
parser.add_argument(
"--verbose", "-v", action="store_true", help="Include debug logs"
)
parser.add_argument(
"--quiet", "-q", action="store_true", help="Hide info-level logs"
)
args = parser.parse_args(argv)
if args.format == "plain" and not os.path.isdir(args.output):
parser.error("When using 'plain' format the output path must be a directory")
return args
def main(argv):
global BASE_URL, REPO_IDENT, USER, TOKEN
loop = asyncio.get_event_loop()
args = parse_args(argv)
# Logging setup.
if args.verbose:
log_level = logging.DEBUG
elif args.quiet:
log_level = logging.WARNING
else:
log_level = logging.INFO
logging.basicConfig(format="%(levelname)5s: %(message)s")
logger.setLevel(log_level)
if args.format is None:
if not os.path.isdir(args.output) and args.output[-3:] in ["tar", "tgz", "zip"]:
args.format = args.output[-3:]
logger.debug("Determined desired format to be %r", args.format)
else:
args.format = "tgz"
# Set global variables.
BASE_URL, REPO_IDENT = convert_repo_web_url(args.repo_url)
logger.debug("Base API URL: %s, repo segment: %s", BASE_URL, REPO_IDENT)
if args.user:
USER = args.user
elif os.environ.get("GH_USER"):
USER = os.environ.get("GH_USER")
else:
USER = os.environ.get("USER", "")
if args.token:
TOKEN = args.token
else:
TOKEN = os.environ.get("GH_TOKEN")
if args.ref is None:
args.ref = loop.run_until_complete(_fetch_repo_info())["default_branch"]
logger.debug("Default branch: %s", args.ref)
# Do the work.
files = loop.run_until_complete(fetch_path_contents(args.path, args.ref))
write_files(files, dest=args.output, fmt=args.format)
logger.info("Success!")
if __name__ == "__main__":
main(sys.argv[1:])
@LewisGaul
Copy link
Author

Now sets file modes correctly (but not mtimes or dir modes). May also be doing a better job of maximising async operations being triggered in parallel, but also requires extra API calls in some cases so not necessarily faster.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment