Skip to content

Instantly share code, notes, and snippets.

@squarewave
Last active March 16, 2017 23:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save squarewave/df4608de29dc13eb865f95dc42f3571e to your computer and use it in GitHub Desktop.
Save squarewave/df4608de29dc13eb865f95dc42f3571e to your computer and use it in GitHub Desktop.
Gather-Nightly-BHR-Stacks
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
---
title: "Nightly Hang Reporter Stacks"
authors:
- mconley
- dthayer
tags:
- spark
- hangs
-
created_at: 2017-03-02
updated_at: 2017-03-13
tldr: Analysis for hang reporting dashboard.
---
# As part of the Quantum Flow project, which seeks to make Firefox _feel_ faster, we'd like to be able to have a dashboard with various statistics on browser hangs. While we have the data in place in `threadHangStats`, we don't currently have any working visual representation of this data. This notebook is intended to process ping data into a format that can be easily consumed by a JS-powered dashboard on TMO.
# In[4]:
import os
import ujson as json
import pandas as pd
from datetime import datetime, timedelta
from moztelemetry import get_pings_properties
from moztelemetry.dataset import Dataset
# Some configuration options:
# In[5]:
use_s3 = False
days_to_aggregate = 3
sample_size = 0.01
# We'd like to have a roughly 1-month chart of the severity of various hangs over time.
# In[6]:
start_date = (datetime.today() - timedelta(days=days_to_aggregate))
start_date_str = start_date.strftime("%Y%m%d")
end_date = (datetime.today() - timedelta(days=0))
end_date_str = end_date.strftime("%Y%m%d")
# In[7]:
pings = Dataset.from_source("telemetry") .where(docType='main') .where(appBuildId=lambda b: (b.startswith(start_date_str) or b > start_date_str)
and (b.startswith(end_date_str) or b < end_date_str)) \
.where(appUpdateChannel="nightly") \
.records(sc, sample=sample_size)
# In[8]:
properties = ["environment/system/os/name",
"application/buildId",
"payload/info/subsessionLength",
"payload/childPayloads",
"payload/threadHangStats"]
ping_props = get_pings_properties(pings, properties, with_processes=True)
# We're currently only interested in Windows pings.
# In[9]:
def windows_only(p):
return p["environment/system/os/name"] == "Windows_NT"
windows_pings_only = ping_props.filter(windows_only)
# Split out content hangs from parent hangs. We need an additional loop with the content hangs due to multiple content processes.
# In[ ]:
def only_hangs_of_type(ping, process_type, native_only):
build_date = ping["application/buildId"][:8] # "YYYYMMDD" : 8 characters
usage_seconds = float(ping['payload/info/subsessionLength'])
if process_type == 'content':
result = []
if ping['payload/childPayloads'] is None:
return result
for payload in ping['payload/childPayloads']:
if 'threadHangStats' not in payload:
continue
for thread_hang in payload['threadHangStats']:
if 'name' not in thread_hang:
continue
if len(thread_hang['hangs']) > 0:
result = result + [{
'build_date': build_date,
'thread_name': thread_hang['name'],
'usage_seconds': usage_seconds,
'hang': x
} for x in thread_hang['hangs'] if 'nativeStack' in x or not native_only]
return result
else:
result = []
if ping['payload/threadHangStats'] is None:
return result
for thread_hang in ping['payload/threadHangStats']:
if 'name' not in thread_hang:
continue
if len(thread_hang['hangs']) > 0:
result = result + [{
'build_date': build_date,
'thread_name': thread_hang['name'],
'usage_seconds': usage_seconds,
'hang': x
} for x in thread_hang['hangs'] if 'nativeStack' in x or not native_only]
return result
def filter_for_hangs_of_type(pings, process_type, native_only):
return pings.flatMap(lambda p: only_hangs_of_type(p, process_type, native_only))
content_hangs = filter_for_hangs_of_type(windows_pings_only, 'content')
parent_hangs = filter_for_hangs_of_type(windows_pings_only, 'parent')
# Aggregate `hang_sum` (approximate total milliseconds for hangs greater than 100ms), and `hang_count` (approximate number of hangs greater than 100ms). This should give us some sense of both the frequency and the severity of various hangs.
# In[ ]:
def map_to_hang_data(hang, native_only):
hist_data = hang['hang']['histogram']['values']
key_ints = map(int, hist_data.keys())
hist = pd.Series(hist_data.values(), index=key_ints)
weights = pd.Series(key_ints, index=key_ints)
hang_sum = (hist * weights)[hist.index >= 100].sum()
hang_count = hist[hist.index >= 100].sum()
# our key will be the stack, the thread name, and the build ID. Once we've reduced on this
# we'll collect as a map, since there should only be ~ 10^1 days, 10^1 threads, 10^3 stacks : 100,000 records
if (native_only):
return (tuple(hang['hang']['nativeStack']['stacks'][0] + [hang['thread_name'], hang['build_date']]), {
'hang_sum': hang_sum,
'hang_count': hang_count,
'usage_seconds': hang['usage_seconds']
})
else:
return (tuple(hang['hang']['stack'] + [hang['thread_name'], hang['build_date']]), {
'hang_sum': hang_sum,
'hang_count': hang_count,
'usage_seconds': hang['usage_seconds']
})
def merge_hang_data(a, b):
return {
'hang_sum': a['hang_sum'] + b['hang_sum'],
'hang_count': a['hang_count'] + b['hang_count'],
'usage_seconds': a['usage_seconds'] + b['usage_seconds'],
}
def get_grouped_sums_and_counts(hangs, native_only):
return hangs.map(lambda x: map_to_hang_data(x, native_only)).reduceByKey(merge_hang_data).collectAsMap()
content_grouped_hangs = get_grouped_sums_and_counts(content_hangs)
parent_grouped_hangs = get_grouped_sums_and_counts(parent_hangs)
# First group by date:
# In[ ]:
def group_by_date(stacks):
dates = {}
for stack, stats in stacks.iteritems():
hang_sum = stats['hang_sum']
hang_count = stats['hang_count']
usage_seconds = stats['usage_seconds']
if len(stack) == 0:
continue
stack_date = stack[-1]
stack = stack[:-1]
if not stack_date in dates:
dates[stack_date] = {
"date": stack_date,
"threads": [],
"usage_seconds": 0
}
date = dates[stack_date]
date["threads"].append((stack, {'hang_sum': hang_sum, 'hang_count': hang_count}))
date["usage_seconds"] += usage_seconds
return dates
content_by_date = group_by_date(content_grouped_hangs)
parent_by_date = group_by_date(parent_grouped_hangs)
# Then by thread name:
# In[ ]:
def group_by_thread_name(stacks):
thread_names = {}
for stack, stats in stacks:
hang_sum = stats['hang_sum']
hang_count = stats['hang_count']
if len(stack) == 0:
continue
stack_thread_name = stack[-1]
stack = stack[:-1]
if not stack_thread_name in thread_names:
thread_names[stack_thread_name] = {
"thread": stack_thread_name,
"hangs": [],
}
thread_name = thread_names[stack_thread_name]
thread_name["hangs"].append((stack, {'hang_sum': hang_sum, 'hang_count': hang_count}))
return thread_names
# Then by the top frame, which will serve as the hang's signature for the time being.
# In[ ]:
def group_by_top_frame(stacks):
top_frames = {}
for stack, stats in stacks:
hang_sum = stats['hang_sum']
hang_count = stats['hang_count']
if len(stack) == 0:
continue
stack_top_frame = stack[-1]
if not stack_top_frame in top_frames:
top_frames[stack_top_frame] = {
"frame": stack_top_frame,
"stacks": [],
"hang_sum": 0,
"hang_count": 0
}
top_frame = top_frames[stack_top_frame]
top_frame["stacks"].append((stack, {'hang_sum': hang_sum, 'hang_count': hang_count}))
top_frame["hang_sum"] += hang_sum
top_frame["hang_count"] += hang_count
return top_frames
# Normalize all our fields by total usage hours for that date.
# In[ ]:
def score(grouping, usage_seconds):
total_hours = usage_seconds / 60
grouping['hang_ms_per_hour'] = grouping['hang_sum'] / total_hours
grouping['hang_count_per_hour'] = grouping['hang_count'] / total_hours
scored_stacks = []
for stack_tuple in grouping['stacks']:
stack_hang_sum = stack_tuple[1]['hang_sum'] / total_hours
stack_hang_count = stack_tuple[1]['hang_count'] / total_hours
scored_stacks.append((stack_tuple[0], {
'hang_ms_per_hour': stack_hang_sum,
'hang_count_per_hour': stack_hang_count
}))
grouping['stacks'] = scored_stacks
return grouping
def score_all(grouped_by_top_frame, total_hours):
return {k: score(g, total_hours) for k, g in grouped_by_top_frame.iteritems()}
# Put the last three sections together:
# In[ ]:
def get_by_top_frame_by_thread(by_thread, usage_seconds):
return {k: score_all(group_by_top_frame(g["hangs"]), usage_seconds) for k, g in by_thread.iteritems()}
def get_by_thread_by_date(by_date):
return {
k: {
'threads': get_by_top_frame_by_thread(group_by_thread_name(g["threads"]), g["usage_seconds"]),
'total_hours': g["usage_seconds"] / 60
} for k, g in by_date.iteritems()
}
content_scored = get_by_thread_by_date(content_by_date)
parent_scored = get_by_thread_by_date(parent_by_date)
# In[ ]:
import ujson as json
def write_file(name, stuff):
filename = "./output/%s-%s.json" % (name, end_date_str)
jsonblob = json.dumps(stuff, ensure_ascii=False)
if use_s3:
# TODO: This was adapted from another report. I'm not actually sure what the process is for
# dumping stuff to s3, and would appreciate feedback!
bucket = "telemetry-public-analysis-2"
timestamped_s3_key = "bhr/data/hang_aggregates/" + name + ".json"
client = boto3.client('s3', 'us-west-2')
transfer = S3Transfer(client)
transfer.upload_file(filename, bucket, timestamped_s3_key, extra_args={'ContentType':'application/json'})
else:
if not os.path.exists('./output'):
os.makedirs('./output')
with open(filename, 'w') as f:
f.write(jsonblob)
# In[1]:
write_file('content_by_day', content_scored)
write_file('parent_by_day', parent_scored)
# In[18]:
def extract_native_stacks(ping):
result = []
if ping['payload/threadHangStats'] is None:
return result
for thread_hang in ping['payload/threadHangStats']:
if 'name' not in thread_hang:
continue
if len(thread_hang['hangs']) > 0:
result = result + [{
'thread_name': thread_hang['name'],
'hang': x
} for x in thread_hang['hangs'] if 'nativeStack' in x and len(x['nativeStack']['stacks']) > 1]
return result
def filter_for_native_stacks(pings):
return pings.flatMap(lambda p: extract_native_stacks(p))
native_stacks = filter_for_native_stacks(windows_pings_only)
# In[25]:
sc.parallelize([(tuple([1,(1,2)]),5),(tuple([1,(1,2)]),6),(tuple([1,3]),7)]).reduceByKey(lambda a,b: a + b).collectAsMap()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment