Skip to content

Instantly share code, notes, and snippets.

@hzbd
Forked from ferstar/delete_action_runs.py
Created January 7, 2022 02:09
Show Gist options
  • Save hzbd/d88392e1bb3a959f0a1420cb20989323 to your computer and use it in GitHub Desktop.
Save hzbd/d88392e1bb3a959f0a1420cb20989323 to your computer and use it in GitHub Desktop.
Delete all workflow runs&artifacts of a github repo
import asyncio
import sys
from aiohttp import ClientSession, ClientTimeout
def seq_iter(seq, step=10):
if not seq:
return seq
from_idx, to_idx, seq_len = 0, step, len(seq)
while True:
yield seq[from_idx:to_idx]
from_idx, to_idx = to_idx, to_idx + step
if to_idx > seq_len:
yield seq[from_idx:]
break
async def delete(session, url):
async with session.delete(url) as del_rsp:
print(f'deleting {url} ... {"ok" if del_rsp.status == 204 else "error"}')
async def delete_artifacts(session, owner, repo, page=1):
path = f'/repos/{owner}/{repo}/actions/artifacts?per_page={per_page}&page={page}'
async with session.get(github_url + path) as rsp:
assert rsp.status == 200, await rsp.json()
ret_data = await rsp.json()
for items in seq_iter(ret_data['artifacts']):
await asyncio.gather(*(delete(session, item['url']) for item in items))
if per_page * page < ret_data['total_count']:
return await delete_artifacts(session, owner, repo, page + 1)
print('delete artifacts done')
async def delete_workflow_runs(session, owner, repo, page=1):
path = f'/repos/{owner}/{repo}/actions/runs?per_page={per_page}&page={page}'
async with session.get(github_url + path) as rsp:
assert rsp.status == 200, await rsp.json()
ret_data = await rsp.json()
for items in seq_iter(ret_data['workflow_runs']):
await asyncio.gather(*(delete(session, item['url']) for item in items))
if per_page * page < ret_data['total_count']:
return await delete_workflow_runs(session, owner, repo, page + 1)
print('delete workflow runs done')
async def find_repos(session, owner, repos=None, page=1):
repos = [] if repos is None else repos
if len(repos) == 1:
return repos
async with session.get(f'{github_url}/users/{owner}/repos?per_page={per_page}&page={page}') as rsp:
assert rsp.status == 200, await rsp.json()
ret = await rsp.json()
if not ret:
return repos
repos.extend([r['name'] for r in await rsp.json()])
return await find_repos(session, owner, repos, page + 1)
async def main(owner, repo=None):
async with ClientSession(headers=headers, timeout=ClientTimeout()) as session:
for repo in await find_repos(session, owner, [repo] if repo else None):
print(f'deleting {repo} for {owner}')
await delete_artifacts(session, owner, repo)
await delete_workflow_runs(session, owner, repo)
if __name__ == '__main__':
args_len = len(sys.argv)
if args_len not in (3, 4):
sys.exit(
f'Usage: python3 {sys.argv[0]} <github_token> <owner> [repo]\n'
'Note: need python3.6 or above and aiohttp\n'
'Generate personal github token: https://github.com/settings/tokens'
)
github_url = 'https://api.github.com'
headers = {'Authorization': f'token {sys.argv[1]}', 'Accept': 'application/vnd.github.v3+json'}
per_page = 30
your_repo = None if args_len == 3 else sys.argv[3]
asyncio.get_event_loop().run_until_complete(main(sys.argv[2], your_repo))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment