|
import logging |
|
import os |
|
from threading import Thread |
|
from typing import List, Tuple, Union |
|
|
|
import requests |
|
|
|
LOGGER = logging.getLogger(__name__) |
|
LOGGER.addHandler(logging.StreamHandler()) |
|
LOGGER.setLevel(logging.DEBUG) |
|
|
|
GIT_USER = os.environ.get("GIT_USER") |
|
GIT_TOKEN = os.environ.get("GIT_TOKEN") |
|
|
|
if not all((GIT_USER, GIT_TOKEN)): |
|
raise UserWarning( |
|
"'GIT_USER' and 'GIT_TOKEN' are mandatory" |
|
) |
|
|
|
headers = { |
|
'Accept': 'application/vnd.github+json', |
|
'Authorization': f'Bearer {GIT_TOKEN}', |
|
'X-GitHub-Api-Version': '2022-11-28', |
|
} |
|
|
|
|
|
def get_workflows(repo: str, **kwargs): |
|
response = requests.get(url=f'https://api.github.com/repos/{GIT_USER}/{repo}/actions/runs', |
|
headers=headers) |
|
if response.ok: |
|
res = response.json() |
|
LOGGER.info("Total workflow runs: %s", res['total_count']) |
|
for run in res['workflow_runs']: |
|
for key, value in kwargs.items(): |
|
if run[key] == value: |
|
yield run |
|
else: |
|
response.raise_for_status() |
|
|
|
|
|
def _delete_action(repo: str, run_id: int): |
|
response = requests.delete(url=f'https://api.github.com/repos/{GIT_USER}/{repo}/actions/runs/{run_id}', |
|
headers=headers) |
|
if response.ok: |
|
LOGGER.info("Deleted run id: %s", run_id) |
|
else: |
|
response.raise_for_status() |
|
|
|
|
|
def delete_workflows(repo: str, titles: Union[List, Tuple] = None, dry_run: bool = True, **kwargs): |
|
as_list = [] |
|
for workflow in get_workflows(repo=repo, **kwargs): |
|
if dry_run: |
|
LOGGER.info(workflow['display_title']) |
|
as_list.append(workflow['display_title']) |
|
elif workflow['display_title'] in titles: |
|
LOGGER.info("Deleting workflow with title: %s", workflow['display_title']) |
|
Thread(target=_delete_action, kwargs={'repo': repo, 'run_id': workflow['id']}).start() |
|
if as_list: |
|
print(as_list) |
|
|
|
|
|
def get_releases_and_tags(repo: str): |
|
response = requests.get(url=f'https://api.github.com/repos/{GIT_USER}/{repo}/releases', |
|
headers=headers) |
|
if response.ok: |
|
for release in response.json(): |
|
# assert release['name'] == release['tag_name'], "Release name [%s] doesn't match tag name [%s]" % \ |
|
# (release['name'], release['tag_name']) |
|
yield { |
|
'release_name': release['name'], |
|
'release_id': release['id'], |
|
'tag': release['tag_name'] |
|
} |
|
else: |
|
response.raise_for_status() |
|
|
|
|
|
def _delete_release_and_tag(repo: str, release_id: str, tag: str): |
|
rel_response = requests.delete(url=f'https://api.github.com/repos/{GIT_USER}/{repo}/releases/{release_id}', |
|
headers=headers) |
|
tag_response = requests.delete(url=f'https://api.github.com/repos/{GIT_USER}/{repo}/git/refs/tags/{tag}', |
|
headers=headers) |
|
if rel_response.ok: |
|
LOGGER.info("Deleted release: %s", release_id) |
|
else: |
|
rel_response.raise_for_status() |
|
if tag_response.ok: |
|
LOGGER.info("Deleted tag: %s", tag) |
|
else: |
|
tag_response.raise_for_status() |
|
|
|
|
|
def delete_releases_and_tags(repo: str, names: Union[List, Tuple] = None, dry_run: bool = True): |
|
as_list = [] |
|
for release_tag in get_releases_and_tags(repo=repo): |
|
if dry_run: |
|
LOGGER.info("Release: %s", release_tag['release_name']) |
|
LOGGER.info("Tag: %s", release_tag['tag']) |
|
as_list.append(release_tag['release_name']) |
|
elif release_tag['release_name'] in names: |
|
LOGGER.info("Deleting release with name: %s and tag with name: %s", |
|
release_tag['release_name'], release_tag['tag']) |
|
Thread(target=_delete_release_and_tag, |
|
kwargs={'repo': repo, 'release_id': release_tag['release_id'], 'tag': release_tag['tag']}).start() |
|
if as_list: |
|
print(as_list) |