-
-
Save hzbd/d88392e1bb3a959f0a1420cb20989323 to your computer and use it in GitHub Desktop.
Delete all workflow runs&artifacts of a github repo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import sys | |
from aiohttp import ClientSession, ClientTimeout | |
def seq_iter(seq, step=10): | |
if not seq: | |
return seq | |
from_idx, to_idx, seq_len = 0, step, len(seq) | |
while True: | |
yield seq[from_idx:to_idx] | |
from_idx, to_idx = to_idx, to_idx + step | |
if to_idx > seq_len: | |
yield seq[from_idx:] | |
break | |
async def delete(session, url): | |
async with session.delete(url) as del_rsp: | |
print(f'deleting {url} ... {"ok" if del_rsp.status == 204 else "error"}') | |
async def delete_artifacts(session, owner, repo, page=1): | |
path = f'/repos/{owner}/{repo}/actions/artifacts?per_page={per_page}&page={page}' | |
async with session.get(github_url + path) as rsp: | |
assert rsp.status == 200, await rsp.json() | |
ret_data = await rsp.json() | |
for items in seq_iter(ret_data['artifacts']): | |
await asyncio.gather(*(delete(session, item['url']) for item in items)) | |
if per_page * page < ret_data['total_count']: | |
return await delete_artifacts(session, owner, repo, page + 1) | |
print('delete artifacts done') | |
async def delete_workflow_runs(session, owner, repo, page=1): | |
path = f'/repos/{owner}/{repo}/actions/runs?per_page={per_page}&page={page}' | |
async with session.get(github_url + path) as rsp: | |
assert rsp.status == 200, await rsp.json() | |
ret_data = await rsp.json() | |
for items in seq_iter(ret_data['workflow_runs']): | |
await asyncio.gather(*(delete(session, item['url']) for item in items)) | |
if per_page * page < ret_data['total_count']: | |
return await delete_workflow_runs(session, owner, repo, page + 1) | |
print('delete workflow runs done') | |
async def find_repos(session, owner, repos=None, page=1): | |
repos = [] if repos is None else repos | |
if len(repos) == 1: | |
return repos | |
async with session.get(f'{github_url}/users/{owner}/repos?per_page={per_page}&page={page}') as rsp: | |
assert rsp.status == 200, await rsp.json() | |
ret = await rsp.json() | |
if not ret: | |
return repos | |
repos.extend([r['name'] for r in await rsp.json()]) | |
return await find_repos(session, owner, repos, page + 1) | |
async def main(owner, repo=None): | |
async with ClientSession(headers=headers, timeout=ClientTimeout()) as session: | |
for repo in await find_repos(session, owner, [repo] if repo else None): | |
print(f'deleting {repo} for {owner}') | |
await delete_artifacts(session, owner, repo) | |
await delete_workflow_runs(session, owner, repo) | |
if __name__ == '__main__': | |
args_len = len(sys.argv) | |
if args_len not in (3, 4): | |
sys.exit( | |
f'Usage: python3 {sys.argv[0]} <github_token> <owner> [repo]\n' | |
'Note: need python3.6 or above and aiohttp\n' | |
'Generate personal github token: https://github.com/settings/tokens' | |
) | |
github_url = 'https://api.github.com' | |
headers = {'Authorization': f'token {sys.argv[1]}', 'Accept': 'application/vnd.github.v3+json'} | |
per_page = 30 | |
your_repo = None if args_len == 3 else sys.argv[3] | |
asyncio.get_event_loop().run_until_complete(main(sys.argv[2], your_repo)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment