Created
December 13, 2023 22:43
-
-
Save frederikstroem/3cafaa3e3daef6826682245b2354f50d to your computer and use it in GitHub Desktop.
2023-12-13 snapshot of automated GitHub cloning tool. Clone or pull all GitHub repositories (submodules included) and Gists across all GitHub organizations and personal account every x hours. Discord webhook on errors. Script is hacked together and unstable!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Unit] | |
Description=Automated GitHub cloning tool | |
[Service] | |
Type=simple | |
Environment=DATA_DIR=/tank/automated_github_cloner | |
EnvironmentFile=%h/automated_github_cloner/.env | |
ExecStart=/usr/bin/python3 %h/automated_github_cloner/main.py | |
Restart=always | |
RestartSec=5 | |
[Install] | |
WantedBy=default.target |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
import datetime | |
import schedule | |
import time | |
from pathlib import Path | |
from github import Github, Auth | |
from discord_webhook import DiscordWebhook | |
RUN_INTERVAL = 8 # hours | |
def check_gh_auth_status(): | |
returncode, stdout, stderr = run_command(['gh', 'auth', 'status']) | |
if returncode == 0 and 'Logged in to github.com' in stdout: | |
print("gh cli is authenticated.", flush=True) | |
else: | |
print("Unable to check gh auth status. Please ensure the gh cli is installed and authenticated.", flush=True) | |
print(f"stdout: {stdout}", flush=True) | |
print(f"stderr: {stderr}", flush=True) | |
exit(1) | |
def report_error_to_discord(error_message, context): | |
full_error_message = f"{context}\n{error_message}" if context else error_message | |
webhook_url = os.getenv('DISCORD_WEBHOOK_URL') | |
if not webhook_url: | |
print("Error: DISCORD_WEBHOOK_URL environment variable is not set.", flush=True) | |
return | |
webhook = DiscordWebhook(url=webhook_url, content=full_error_message) | |
try: | |
webhook.execute() | |
except Exception as e: | |
print(f"Failed to send error report to Discord: {e}", flush=True) | |
def run_command(command, cwd=None, context=""): | |
try: | |
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, cwd=cwd) | |
if result.returncode != 0: | |
error_message = f"Command '{' '.join(command)}' failed with error:\n{result.stderr}" | |
print(error_message, flush=True) | |
report_error_to_discord(error_message, context) | |
return result.returncode, result.stdout, result.stderr | |
except Exception as e: | |
error_message = f"Exception occurred while running command '{' '.join(command)}':\n{e}" | |
print(error_message, flush=True) | |
report_error_to_discord(error_message, context) | |
return 1, "", str(e) | |
def get_main_branch_name(git_dir): | |
returncode, stdout, _ = run_command(['git', 'remote', 'show', 'origin'], cwd=git_dir) | |
if returncode == 0: | |
for line in stdout.split('\n'): | |
if 'HEAD branch' in line: | |
return line.split(':')[1].strip() | |
return None | |
def create_backup_and_hard_reset(git_dir, git_url): | |
main_branch = get_main_branch_name(git_dir) | |
if main_branch is None: | |
print(f"Unable to determine the main branch for {git_url}.", flush=True) | |
return | |
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') | |
backup_branch = f"backup-{timestamp}" | |
_, _, _ = run_command(['git', 'checkout', '-b', backup_branch], cwd=git_dir) | |
_, _, _ = run_command(['git', 'reset', '--hard', f'origin/{main_branch}'], cwd=git_dir) | |
print(f"Backup branch created and hard reset to origin/{main_branch}: {backup_branch}", flush=True) | |
def clone_or_update_git_entity(git_url, git_dir, is_repo=True): | |
context = f"Repository URL: {git_url}\nDirectory: {git_dir}" | |
if os.path.isdir(git_dir): | |
print(f"Updating {'repository' if is_repo else 'gist'}: {git_url}", flush=True) | |
pull_cmd = ['git', 'pull', '--all'] if is_repo else ['git', 'pull'] | |
returncode, _, stderr = run_command(pull_cmd, cwd=git_dir, context=context) | |
if returncode != 0 and 'conflict' in stderr.lower(): | |
print("Merge conflict detected.", flush=True) | |
create_backup_and_hard_reset(git_dir, git_url) | |
elif returncode != 0: | |
print(f"Error encountered during pull: {stderr}", flush=True) | |
if is_repo: | |
# Update submodules for repositories | |
print(f"Updating submodules for: {git_url}", flush=True) | |
_, _, _ = run_command(['git', 'submodule', 'update', '--init', '--recursive', '--remote'], cwd=git_dir, context=context) | |
else: | |
print(f"Cloning {'repository' if is_repo else 'gist'}: {git_url}", flush=True) | |
os.makedirs(git_dir, exist_ok=True) | |
clone_cmd = ['git', 'clone', '--recurse-submodules', '-j8', git_url, '.'] if is_repo else ['git', 'clone', git_url, '.'] | |
returncode, _, stderr = run_command(clone_cmd, cwd=git_dir, context=context) | |
if returncode != 0: | |
print(f"Error cloning {'repository' if is_repo else 'gist'}: {stderr}", flush=True) | |
def main(): | |
print("-----------------------------", flush=True) | |
print(f"Starting GitHub cloner at {datetime.datetime.now()}\n", flush=True) | |
# Check required environment variables | |
auth_token = os.getenv('AUTH_TOKEN') | |
data_dir = os.getenv('DATA_DIR') | |
if not auth_token or not data_dir: | |
print("Error: Required environment variables AUTH_TOKEN or DATA_DIR are not set.") | |
exit(1) | |
# Check `gh auth status` to ensure user can clone private repositories etc. | |
check_gh_auth_status() | |
# Create a GitHub instance | |
g = Github(auth=Auth.Token(auth_token)) | |
print(f"Cloning or updating repositories and gists to: {data_dir}", flush=True) | |
# Repositories | |
repos_path = os.path.join(data_dir, "repos") | |
os.makedirs(repos_path, exist_ok=True) | |
print("Cloning or updating repositories...", flush=True) | |
for repo in g.get_user().get_repos(): | |
repo_dir = os.path.join(repos_path, repo.name) | |
clone_or_update_git_entity(repo.clone_url, repo_dir, is_repo=True) | |
# Gists | |
gists_path = os.path.join(data_dir, "gists") | |
os.makedirs(gists_path, exist_ok=True) | |
print("Cloning or updating gists...", flush=True) | |
for gist in g.get_user().get_gists(): | |
gist_dir = os.path.join(gists_path, gist.id) | |
clone_or_update_git_entity(gist.git_pull_url, gist_dir, is_repo=False) | |
print(f"\nGitHub cloner finished at {datetime.datetime.now()}, next run in {RUN_INTERVAL} hours at {datetime.datetime.now() + datetime.timedelta(hours=RUN_INTERVAL)}", flush=True) | |
print("-----------------------------", flush=True) | |
if __name__ == "__main__": | |
# Run the main function immediately upon startup | |
main() | |
# Schedule the main function to run every x hours | |
schedule.every(RUN_INTERVAL).hours.do(main) | |
# Keep the script running and check for scheduled tasks | |
while True: | |
schedule.run_pending() | |
time.sleep(60) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PyGithub | |
schedule | |
discord-webhook |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment