Skip to content

Instantly share code, notes, and snippets.

@Ray-Eldath
Created December 29, 2023 07:15
Show Gist options
  • Save Ray-Eldath/b2979a62c6d612a85d22c1da11865dd6 to your computer and use it in GitHub Desktop.
Save Ray-Eldath/b2979a62c6d612a85d22c1da11865dd6 to your computer and use it in GitHub Desktop.
Filter out GitHub Runs that contains certain log line
import asyncio
import json
import os
import aiohttp
from gidgethub import aiohttp as gh_aiohttp
Token = os.getenv("PINPOINTER_GITHUB_TOKEN")
Repo = "cloudberrydb/cloudberrydb"
MaxRuns = 100000 # unlimited
ResultFile = "failures.ndjson"
async def process_log(gh, run_id, j, f):
cmit, job_id = j['head_sha'], j['id']
desc = await gh.getitem(f"/repos/{Repo}/commits/{cmit}")
desc = desc['commit']['message'].splitlines()[0]
obj = {'found': False,
'completed_at': j['completed_at'],
'workflow_job_name': j['name'],
'desc': desc,
'head_sha': cmit,
'run_id': run_id,
'job_id': job_id}
try:
log = await gh.getitem(f"/repos/{Repo}/actions/jobs/{job_id}/logs")
except Exception as e:
msg = f"screening job {job_id} failed for run {run_id}"
raise Exception(msg) from e
for line in log.splitlines():
if "ERROR: Failure encountered in upgrading" in line:
print("FOUND", end=' ')
obj['found'] = True
obj['line'] = line
ndjson_append(obj, f)
return
# not found
ndjson_append(obj, f)
async def process_run(gh, run_id, f):
try:
jobs = await gh.getitem(f"/repos/{Repo}/actions/runs/{run_id}/jobs")
except Exception as e:
msg = f"retrieving jobs for run {run_id} failed"
raise Exception(msg) from e
jobs = jobs['jobs']
for j in jobs:
if j['name'] != 'build' and j['conclusion'] == 'failure':
await process_log(gh, run_id, j, f)
print(run_id, end=" ")
return run_id
async def main():
with open(ResultFile, "w") as f:
async with aiohttp.ClientSession() as client:
gh = gh_aiohttp.GitHubAPI(client, requester="Ray-Eldath", oauth_token=Token)
print("Processing failed run... ", end="")
runs = gh.getiter(f'/repos/{Repo}/actions/runs?status=failure&per_page=100', iterable_key='workflow_runs')
tasks = []
i = 0
async for r in runs: # TODO: any better style?
if i >= MaxRuns:
break
tasks.append(asyncio.ensure_future(process_run(gh, r['id'], f)))
i += 1
print(f"len(tasks) = {len(tasks)}")
try:
await asyncio.gather(*tasks)
except Exception as e:
for t in tasks:
t.cancel()
print("Exception thrown. Cancelling remaining tasks to preserve API rate limit quota")
raise e
def ndjson_append(obj, f):
print(json.dumps(obj), file=f)
f.flush()
return
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment