Skip to content

Instantly share code, notes, and snippets.

@st1vms
Last active February 4, 2024 18:27
Show Gist options
  • Save st1vms/1e7c0fc573807402926e7f5b77a5d034 to your computer and use it in GitHub Desktop.
Save st1vms/1e7c0fc573807402926e7f5b77a5d034 to your computer and use it in GitHub Desktop.
Git API utility wrappers
"""Look for the latest commit containing a file with a specific string in it"""
from typing import List, Dict
from base64 import b64decode
from requests import get as http_get
def get_commit_history(
owner: str, repo: str, access_token: str, quiet: bool = False
) -> List[Dict]:
"""Retrieves commit history for a repository"""
url = f"https://api.github.com/repos/{owner}/{repo}/commits"
headers = {"Authorization": f"Bearer {access_token}"}
all_commits = []
page = 1
per_page = 100 # Adjust per_page as needed
while True:
params = {"per_page": per_page, "page": page}
response = http_get(url, headers=headers, params=params, timeout=10)
if response.status_code == 200:
commits = response.json()
if not commits: # No more commits
break
all_commits.extend(commits)
if not quiet:
print(f"Retrieved {len(all_commits)} commits...", end="\r" * 30)
page += 1
continue
raise RuntimeError(
f"Failed to retrieve commit history. Status code: {response.status_code}"
)
return all_commits
def find_file_in_commit(
commit_sha: str, owner: str, repo: str, file_path: str, access_token: str
) -> str:
"""Finds changed file in commit"""
commit_url = f"https://api.github.com/repos/{owner}/{repo}/commits/{commit_sha}"
headers = {"Authorization": f"Bearer {access_token}"}
commit_details = http_get(commit_url, headers=headers, timeout=10).json()
# Check if the 'files' key exists in the commit details
if "files" in commit_details:
files_changed = commit_details["files"]
for file_changed in files_changed:
if file_changed["filename"] == file_path:
return file_changed["contents_url"]
return None
def get_file_content(file_content_url: str, access_token: str) -> str:
"""Retrieves file content from blob"""
headers = {
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {access_token}",
}
file_content_response = http_get(file_content_url, headers=headers, timeout=10)
# Check if the request was successful
if file_content_response.status_code == 200:
file_content = file_content_response.json()["content"]
decoded_content = b64decode(file_content).decode("utf-8")
return decoded_content
raise RuntimeError(
f"Failed to retrieve file content. Status code: {file_content_response.status_code}"
)
def git_find_commit(
repo_owner: str,
repo_name: str,
rel_file_path: str,
match_str: str,
access_token: str,
n_results: int = 1,
quiet: bool = False,
) -> List[str]:
"""Look for `n_results` latest commits in a repository
having a file named `rel_file_path` with a string `match_str` in it.
Returns a list with all of the blob checksum strings (sha) found.
"""
def _print(*args, **kwargs) -> None:
if not quiet:
print(*args, **kwargs)
_print(
f"\nSearching for the latest commit in repository '{repo_owner}/{repo_name}"
f"\nLooking for latest commits with a file named '{rel_file_path}'"
f"having the string '{match_str}' in it...\n"
)
commits = get_commit_history(repo_owner, repo_name, access_token, quiet=quiet)
results = []
for commit in commits:
commit_sha = commit["sha"]
_print(f"Checking commit: {commit_sha}", end="\r" * 50)
blob_url = find_file_in_commit(
commit_sha, repo_owner, repo_name, rel_file_path, access_token
)
if blob_url is not None:
ftext = get_file_content(blob_url, access_token)
# Add logic to check if the file content has the specific field
if match_str in ftext:
n_results -= 1
results.append(commit_sha)
_print(f"\nField '{match_str}' found in commit {commit_sha}")
if n_results <= 0:
return results
return results
if __name__ == "__main__":
# Replace with your git authentication token
API_TOKEN = ""
REPO_OWNER = ""
REPO_NAME = ""
# Filepath relative to repository tree structure
REL_FILE_PATH = ""
# String to find inside blob content
MATCH_STR = ""
# Number of latest commits to gather
N_RESULTS = 2
blobs = git_find_commit(
REPO_OWNER, # Repo owner
REPO_NAME, # Repo name
REL_FILE_PATH, # Relative filepath
MATCH_STR, # String to find in file blob text
API_TOKEN, # Github access token
n_results=N_RESULTS,
)
print(f"\nFound {len(blobs)} latest commits...")
for blob in blobs:
print(
f"\nhttps://github.com/{REPO_OWNER}/{REPO_NAME}/blob/{blob}/{REL_FILE_PATH}"
)
"""Github API utility module"""
from requests import get as http_get
def latest_git_repo_tag(owner: str, repo_name: str, timeout: float = 3) -> str | None:
"""Retrieves the latest tag name for a repository on Github"""
response = http_get(
f"https://api.github.com/repos/{owner}/{repo_name}/releases/latest",
timeout=timeout,
)
if response.status_code != 200:
raise RuntimeError(
f"Got HTTP error when retrieving release tag: {response.status_code}"
)
return str(response.json()["name"])
Collections of Git API utility wrappers made in Python
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment