Skip to content

Instantly share code, notes, and snippets.

@therealmitchconnors
Last active June 18, 2019 01:17
Show Gist options
  • Save therealmitchconnors/4a4f2ec5e4afbdcc94bce32d9346b1ce to your computer and use it in GitHub Desktop.
Save therealmitchconnors/4a4f2ec5e4afbdcc94bce32d9346b1ce to your computer and use it in GitHub Desktop.
Parse Prow data for a given PR to detect likely flakes.
from google.cloud import storage # installed with `pip3 install google-cloud-storage`
import json
import re
# before running this file, you should be sure to have your gcloud
# credentials configured i.e. `gcloud auth application-default login`
# see https://cloud.google.com/compute/docs/tutorials/python-guide
prefix = 'pr-logs/pull/istio_istio/'
def ls_d(bucket, prefix):
result = []
pages = bucket.list_blobs(prefix=prefix, delimiter='/').pages
for p in pages:
result.extend(p.prefixes)
return result
def get_blob_string(bucket, key):
blob = bucket.get_blob(key)
if blob == None:
return ''
return blob.download_as_string()
def leaf_name(fullname, delimiter='/'):
if fullname.endswith(delimiter):
fullname = fullname[:-1*len(delimiter)]
return fullname.split(delimiter)[-1]
def get_bucket():
client = storage.Client()
return client.get_bucket('istio-prow')
def get_test_results(pr_path, b):
prnum = leaf_name(pr_path)
jobs = ls_d(b, pr_path)
jobmap = {}
for job in jobs:
jobname = leaf_name(job)
# if jobname != 'istio_auth_sds_e2e-master':
# continue
runmap = {}
runs = ls_d(b, job)
for run in runs:
runname = leaf_name(run)
print("Checking test " + jobname + ", run " + runname)
finishedstring = get_blob_string(b, run + 'finished.json')
if len(finishedstring) == 0:
runmap[runname] = {'finished': False}
else:
finished = json.loads(finishedstring)
clonestring = get_blob_string(b, run + 'clone-records.json')
clone = json.loads(clonestring)
runmap[runname] = {'finished': True, \
'passed':finished['passed'], 'sha':clone[0]['refs']['pulls'][0]['sha'], \
'base':clone[0]['refs']['base_sha'], 'clone-failure': clone[0]['failed']}
jobmap[jobname] = runmap
return prnum, jobmap
def eval_results(jobmap, prnum, b):
probable_flakes = []
for job, runmap in jobmap.items():
shamap = {}
for runname, run in runmap.items():
if run['finished']:
if run['clone-failure']:
continue
sha = run['sha']
run['run'] = runname
if sha in shamap:
prev = shamap[sha][0]
if prev['passed'] != run['passed']:
# this is a probable flake, let's check for environmental failures
if isEnvFail(prev, prnum, b, job) or isEnvFail(run, prnum, b, job):
print('ENVFAIL: Test ' + job + ' runs ' + prev['run'] + ' and ' + runname )
continue
msg = 'Test ' + job + ' runs ' + prev['run'] + ' and ' + runname + ' flaked for commit ' + sha + \
' with bases ' + prev['base'] + ' and ' + run['base']
print(msg)
probable_flakes += msg
else:
shamap[sha] += run
else:
shamap[sha] = [run]
return probable_flakes
# Returns true if we are certain this is env failure, false if we aren't sure
def isEnvFail(run, prnum, b, job):
blob = b.get_blob(prefix + prnum + "/" + job + "/" + run["run"] + "/build-log.txt")
blob.download_to_filename("/tmp/foo")
h = re.compile("(error parsing HTTP 408 response body|failed to get a Boskos resource|recipe for target '.*docker.*' failed|Entrypoint received interrupt: terminated)")
with open("/tmp/foo") as infile:
for line in infile:
if h.search(line) != None:
return True
return False
import multiprocessing
# Calling this outside a Pool initializer is dangerous. Use get_bucket instead.
def init_global_bucket(bucket_name):
client = storage.Client()
global bucket
bucket = client.get_bucket(bucket_name)
# This function calls get_test_results using the global bucket var for multiprocessing
def get_test_results_global(pr_path):
return get_test_results(pr_path, bucket)
def main():
import sys
prstart = sys.argv[1]
prend = None
if len(sys.argv) > 2:
prend = sys.argv[2]
else:
prend = prstart
try:
with multiprocessing.Pool(processes=32, initializer=init_global_bucket, initargs=['istio-prow']) as pool:
results = pool.map(get_test_results_global, [prefix + str(prnum) + '/' for prnum in range(int(prstart), int(prend))])
except KeyboardInterrupt:
print("terminating gcs queries, skipping to flake analysis")
b = get_bucket()
for prnum, result in results:
if result == None:
continue
print("***Flakes for PR " + prnum + "***")
eval_results(result, prnum, b)
if __name__ == '__main__':
main()
@therealmitchconnors
Copy link
Author

Run with python3 frosted-flakes.py 14627 where 14627 is the PR#.

@therealmitchconnors
Copy link
Author

With the new multiprocessing changes, we can process a single PR in about 7 seconds. This is still pretty slow, and doesn't include env detection, but it's much better than the original one per three minutes. Also, this speed increase requires using a service account...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment