Skip to content

Instantly share code, notes, and snippets.

@thevickypedia
Last active April 9, 2023 15:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thevickypedia/586fd187c1f1ee82163766822eb5de1b to your computer and use it in GitHub Desktop.
Save thevickypedia/586fd187c1f1ee82163766822eb5de1b to your computer and use it in GitHub Desktop.
Clean up releases and tags in GitHub
import logging
import os
from threading import Thread
from typing import List, Tuple, Union
import requests
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.DEBUG)
GIT_USER = os.environ.get("GIT_USER")
GIT_TOKEN = os.environ.get("GIT_TOKEN")
if not all((GIT_USER, GIT_TOKEN)):
raise UserWarning(
"'GIT_USER' and 'GIT_TOKEN' are mandatory"
)
headers = {
'Accept': 'application/vnd.github+json',
'Authorization': f'Bearer {GIT_TOKEN}',
'X-GitHub-Api-Version': '2022-11-28',
}
def get_workflows(repo: str, **kwargs):
response = requests.get(url=f'https://api.github.com/repos/{GIT_USER}/{repo}/actions/runs',
headers=headers)
if response.ok:
res = response.json()
LOGGER.info("Total workflow runs: %s", res['total_count'])
for run in res['workflow_runs']:
for key, value in kwargs.items():
if run[key] == value:
yield run
else:
response.raise_for_status()
def _delete_action(repo: str, run_id: int):
response = requests.delete(url=f'https://api.github.com/repos/{GIT_USER}/{repo}/actions/runs/{run_id}',
headers=headers)
if response.ok:
LOGGER.info("Deleted run id: %s", run_id)
else:
response.raise_for_status()
def delete_workflows(repo: str, titles: Union[List, Tuple] = None, dry_run: bool = True, **kwargs):
as_list = []
for workflow in get_workflows(repo=repo, **kwargs):
if dry_run:
LOGGER.info(workflow['display_title'])
as_list.append(workflow['display_title'])
elif workflow['display_title'] in titles:
LOGGER.info("Deleting workflow with title: %s", workflow['display_title'])
Thread(target=_delete_action, kwargs={'repo': repo, 'run_id': workflow['id']}).start()
if as_list:
print(as_list)
def get_releases_and_tags(repo: str):
response = requests.get(url=f'https://api.github.com/repos/{GIT_USER}/{repo}/releases',
headers=headers)
if response.ok:
for release in response.json():
# assert release['name'] == release['tag_name'], "Release name [%s] doesn't match tag name [%s]" % \
# (release['name'], release['tag_name'])
yield {
'release_name': release['name'],
'release_id': release['id'],
'tag': release['tag_name']
}
else:
response.raise_for_status()
def _delete_release_and_tag(repo: str, release_id: str, tag: str):
rel_response = requests.delete(url=f'https://api.github.com/repos/{GIT_USER}/{repo}/releases/{release_id}',
headers=headers)
tag_response = requests.delete(url=f'https://api.github.com/repos/{GIT_USER}/{repo}/git/refs/tags/{tag}',
headers=headers)
if rel_response.ok:
LOGGER.info("Deleted release: %s", release_id)
else:
rel_response.raise_for_status()
if tag_response.ok:
LOGGER.info("Deleted tag: %s", tag)
else:
tag_response.raise_for_status()
def delete_releases_and_tags(repo: str, names: Union[List, Tuple] = None, dry_run: bool = True):
as_list = []
for release_tag in get_releases_and_tags(repo=repo):
if dry_run:
LOGGER.info("Release: %s", release_tag['release_name'])
LOGGER.info("Tag: %s", release_tag['tag'])
as_list.append(release_tag['release_name'])
elif release_tag['release_name'] in names:
LOGGER.info("Deleting release with name: %s and tag with name: %s",
release_tag['release_name'], release_tag['tag'])
Thread(target=_delete_release_and_tag,
kwargs={'repo': repo, 'release_id': release_tag['release_id'], 'tag': release_tag['tag']}).start()
if as_list:
print(as_list)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment