Skip to content

Instantly share code, notes, and snippets.

@bsmedberg
Created June 9, 2017 19:26
Show Gist options
  • Save bsmedberg/d9c2e50af7d710fb14b488794a349000 to your computer and use it in GitHub Desktop.
Save bsmedberg/d9c2e50af7d710fb14b488794a349000 to your computer and use it in GitHub Desktop.
daily-latency-metrics
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Unable to parse whitelist: /mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/histogram-whitelists.json.\n",
"Assuming all histograms are acceptable.\n"
]
}
],
"source": [
"import ujson as json\n",
"import pandas as pd\n",
"import numpy as np\n",
"\n",
"from moztelemetry import get_pings_properties\n",
"from moztelemetry.dataset import Dataset\n",
"from moztelemetry.histogram import Histogram\n",
"from operator import add\n",
"\n",
"from datetime import date, timedelta\n",
"\n",
"%matplotlib inline"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from collections import namedtuple"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import itertools"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import pyspark\n",
"import pyspark.sql.types as st\n",
"import pyspark.sql.functions as sf"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import boto3"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def ping_filter(p):\n",
" if p.get(\"environment/system/os/name\", None) != \"Windows_NT\":\n",
" return False\n",
" if p.get(\"payload/info/subsessionLength\", 0) <= 0:\n",
" return False\n",
" if p.get(\"environment/settings/e10sEnabled\", False) != True:\n",
" return False\n",
" return True"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"Ping = namedtuple(\n",
" \"Ping\",\n",
" (\n",
" \"client_id\",\n",
" \"build_id\",\n",
" \"quantum_ready\",\n",
" \"chrome_input_latency_gt_250\",\n",
" \"chrome_input_latency_gt_2500\",\n",
" \"content_input_latency_gt_250\",\n",
" \"content_input_latency_gt_2500\",\n",
" \"chrome_gc_gt_150\",\n",
" \"chrome_cc_gt_150\",\n",
" \"content_gc_gt_2500\",\n",
" \"content_cc_gt_2500\",\n",
" \"ghost_windows\",\n",
" \"subsession_length\",\n",
" )\n",
")\n",
"\n",
"ping_properties = [\n",
" \"clientId\",\n",
" \"environment/build/buildId\",\n",
" \"environment/system/os/name\",\n",
" \"environment/settings/e10sEnabled\",\n",
" \"environment/addons/theme/id\",\n",
" \"environment/addons/activeAddons\",\n",
" \"payload/histograms/INPUT_EVENT_RESPONSE_COALESCED_MS\",\n",
" \"payload/histograms/CYCLE_COLLECTOR_MAX_PAUSE\",\n",
" \"payload/histograms/GC_MAX_PAUSE_MS_2\",\n",
" \"payload/histograms/GHOST_WINDOWS\",\n",
" \"payload/info/subsessionLength\",\n",
"]\n",
"\n",
"default_themes = (\n",
" '{972ce4c6-7e08-4474-a285-3208198ce6fd}',\n",
" 'firefox-compact-light@mozilla.org',\n",
" 'firefox-compact-dark@mozilla.org',\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def th(d, name, cutoff):\n",
" h = d[name]\n",
" if h is None:\n",
" return 0\n",
" return int(h.truncate(before=cutoff).sum())\n",
"\n",
"def ping_mapper(d):\n",
" if not d[\"environment/settings/e10sEnabled\"]:\n",
" quantum_ready = \"no_e10s\"\n",
" elif any(not(e.get(\"isSystem\", False) or e.get(\"isWebExtension\", False))\n",
" for e in (d[\"environment/addons/activeAddons\"] or {}).itervalues()):\n",
" quantum_ready = \"no_addons\"\n",
" elif d[\"environment/addons/theme/id\"] not in default_themes:\n",
" quantum_ready = \"no_other_theme\"\n",
" else:\n",
" quantum_ready = \"yes\"\n",
" \n",
" return Ping(\n",
" d[\"clientId\"],\n",
" d[\"environment/build/buildId\"],\n",
" quantum_ready,\n",
" th(d, \"payload/histograms/INPUT_EVENT_RESPONSE_COALESCED_MS_parent\", 250),\n",
" th(d, \"payload/histograms/INPUT_EVENT_RESPONSE_COALESCED_MS_parent\", 2500),\n",
" th(d, \"payload/histograms/INPUT_EVENT_RESPONSE_COALESCED_MS_children\", 250),\n",
" th(d, \"payload/histograms/INPUT_EVENT_RESPONSE_COALESCED_MS_children\", 2500),\n",
" th(d, \"payload/histograms/GC_MAX_PAUSE_MS_2_parent\", 150),\n",
" th(d, \"payload/histograms/CYCLE_COLLECTOR_MAX_PAUSE_parent\", 150),\n",
" th(d, \"payload/histograms/GC_MAX_PAUSE_MS_2_children\", 2500),\n",
" th(d, \"payload/histograms/CYCLE_COLLECTOR_MAX_PAUSE_children\", 2500),\n",
" th(d, \"payload/histograms/GHOST_WINDOWS\", 1),\n",
" d[\"payload/info/subsessionLength\"]\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"ping_schema = st.StructType([\n",
" st.StructField(\"client_id\", st.StringType()),\n",
" st.StructField(\"build_id\", st.StringType()),\n",
" st.StructField(\"quantum_ready\", st.StringType()),\n",
" st.StructField(\"chrome_input_latency_gt_250\", st.IntegerType()),\n",
" st.StructField(\"chrome_input_latency_gt_2500\", st.IntegerType()),\n",
" st.StructField(\"content_input_latency_gt_250\", st.IntegerType()),\n",
" st.StructField(\"content_input_latency_gt_2500\", st.IntegerType()),\n",
" st.StructField(\"chrome_gc_gt_150\", st.IntegerType()),\n",
" st.StructField(\"chrome_cc_gt_150\", st.IntegerType()),\n",
" st.StructField(\"content_gc_gt_2500\", st.IntegerType()),\n",
" st.StructField(\"content_cc_gt_2500\", st.IntegerType()),\n",
" st.StructField(\"ghost_windows\", st.IntegerType()),\n",
" st.StructField(\"subsession_length\", st.LongType()),\n",
"])"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def save_submission_date(day):\n",
" ds = Dataset.from_source(\"telemetry\") \\\n",
" .where(docType='main') \\\n",
" .where(submissionDate=lambda d: d >= day.strftime('%Y%m%d')) \\\n",
" .where(appUpdateChannel=\"nightly\")\n",
"\n",
" pings = ds.records(sc)\n",
" \n",
" data = get_pings_properties(pings, ping_properties, with_processes=True) \\\n",
" .filter(ping_filter).map(ping_mapper)\n",
" ds = spark.createDataFrame(data, ping_schema)\n",
" ds.write.mode(\"overwrite\").parquet(\"s3a://net-mozaws-prod-us-west-2-pipeline-analysis/bsmedberg/quantum-dataset/submission_date={}\".format(day.strftime(\"%Y%m%d\")))"
]
},
{
"cell_type": "code",
"execution_count": 52,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def daily_stats(dataset):\n",
" summations = (\n",
" \"subsession_length\",\n",
" \"chrome_input_latency_gt_250\",\n",
" \"chrome_input_latency_gt_2500\",\n",
" \"content_input_latency_gt_250\",\n",
" \"content_input_latency_gt_2500\",\n",
" )\n",
" props = [sf.sum(dataset[n]).alias(n) for n in summations]\n",
" props.append(sf.count(\"*\").alias(\"total_subsessions\"))\n",
" props.append(sf.sum(sf.when(dataset.ghost_windows > 0, 1).otherwise(0)).alias(\"subsessions_with_ghost_windows\"))\n",
" data = dataset.agg(*props).first()\n",
" hours = data.subsession_length / 60.0 / 60.0\n",
" \n",
" return {\n",
" \"total_subsessions\": data.total_subsessions,\n",
" \"ghost_windows_rate\": 1.0 * data.subsessions_with_ghost_windows / data.total_subsessions,\n",
" \"mtbf_chrome_input_latency_gt_250\": hours / data.chrome_input_latency_gt_250,\n",
" \"mtbf_chrome_input_latency_gt_2500\": hours / data.chrome_input_latency_gt_2500,\n",
" \"mtbf_content_input_latency_gt_250\": hours / data.content_input_latency_gt_250,\n",
" \"mtbf_content_input_latency_gt_2500\": hours / data.content_input_latency_gt_2500,\n",
" }\n",
"\n",
"def weekly_stats(dataset):\n",
" data = dataset.agg(\n",
" sf.count(\"*\").alias(\"client_count\"),\n",
" sf.sum(sf.when(dataset.chrome_gc_gt_150, 1).otherwise(0)).alias(\"chrome_gc_gt_150\"),\n",
" sf.sum(sf.when(dataset.chrome_cc_gt_150, 1).otherwise(0)).alias(\"chrome_cc_gt_150\"),\n",
" sf.sum(sf.when(dataset.chrome_gc_gt_150 | dataset.chrome_cc_gt_150, 1).otherwise(0)).alias(\"chrome_gccc_gt_150\"),\n",
" sf.sum(sf.when(dataset.content_gc_gt_2500, 1).otherwise(0)).alias(\"content_gc_gt_2500\"),\n",
" sf.sum(sf.when(dataset.content_cc_gt_2500, 1).otherwise(0)).alias(\"content_cc_gt_2500\"),\n",
" sf.sum(sf.when(dataset.content_gc_gt_2500 | dataset.content_cc_gt_2500, 1).otherwise(0)).alias(\"content_gccc_gt_2500\")\n",
" ).first()\n",
"\n",
" total = float(data.client_count)\n",
" \n",
" return {\n",
" \"total_users\": data.client_count,\n",
" \"chrome_gc_gt_150\": data.chrome_gc_gt_150 / total,\n",
" \"chrome_cc_gt_150\": data.chrome_cc_gt_150 / total,\n",
" \"chrome_gccc_gt_150\": data.chrome_gccc_gt_150 / total,\n",
" \"content_gc_gt_2500\": data.content_gc_gt_2500 / total,\n",
" \"content_cc_gt_2500\": data.content_cc_gt_2500 / total,\n",
" \"content_gccc_gt_2500\": data.content_gccc_gt_2500 / total,\n",
" }\n",
"\n",
"def analyze_for_date(day):\n",
" weekago = day - timedelta(days=7)\n",
" dataset = spark.read.parquet(\"s3n://net-mozaws-prod-us-west-2-pipeline-analysis/bsmedberg/quantum-dataset\")\n",
"\n",
" daily_data = dataset.where(dataset.submission_date == day.strftime(\"%Y%m%d\")).cache()\n",
"\n",
" weekly_data = dataset.where((dataset.submission_date > weekago.strftime(\"%Y%m%d\")) & (dataset.submission_date <= day.strftime(\"%Y%m%d\")))\n",
" grouped_by_client = weekly_data.groupBy('client_id').agg(\n",
" (sf.sum(sf.when(dataset.quantum_ready == \"yes\", 0).otherwise(1)) == 0).alias(\"quantum_ready\"),\n",
" (sf.sum(dataset.chrome_gc_gt_150) > 0).alias(\"chrome_gc_gt_150\"),\n",
" (sf.sum(dataset.chrome_cc_gt_150) > 0).alias(\"chrome_cc_gt_150\"),\n",
" (sf.sum(dataset.content_gc_gt_2500) > 0).alias(\"content_gc_gt_2500\"),\n",
" (sf.sum(dataset.content_cc_gt_2500) > 0).alias(\"content_cc_gt_2500\")\n",
" ).cache()\n",
"\n",
" output_data = {\n",
" 'nightly_all': daily_stats(daily_data),\n",
" 'nightly_quantumready': daily_stats(daily_data.where(daily_data.quantum_ready == \"yes\")),\n",
" 'quantum_readiness': dict(dataset.groupBy(\"quantum_ready\").count().collect()),\n",
" 'weekly_all': weekly_stats(grouped_by_client),\n",
" 'weekly_quantumready': weekly_stats(grouped_by_client.where(grouped_by_client.quantum_ready)),\n",
" }\n",
"\n",
" bucket = boto3.resource('s3').Bucket('telemetry-public-analysis-2')\n",
" bucket.put_object(Body=json.dumps(output_data),\n",
" Key='bsmedberg/daily-latency-metrics/{}.json'.format(day.strftime(\"%Y%m%d\"))) \n",
" return output_data"
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"fetching 66484.24354MB in 182876 files...\n"
]
},
{
"ename": "KeyboardInterrupt",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001b[0;31m\u001b[0m",
"\u001b[0;31mKeyboardInterrupt\u001b[0mTraceback (most recent call last)",
"\u001b[0;32m<ipython-input-44-85a506f1aaf5>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0msave_submission_date\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdate\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m2017\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m5\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m28\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m<ipython-input-34-13f252129436>\u001b[0m in \u001b[0;36msave_submission_date\u001b[0;34m(day)\u001b[0m\n\u001b[1;32m 6\u001b[0m \u001b[0mdata\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mget_pings_properties\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpings\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mping_properties\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mwith_processes\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mTrue\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m.\u001b[0m\u001b[0mfilter\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mping_filter\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmap\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mping_mapper\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 7\u001b[0m \u001b[0mds\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcreateDataFrame\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdata\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mping_schema\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 8\u001b[0;31m \u001b[0mds\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwrite\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmode\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"overwrite\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mparquet\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"s3a://net-mozaws-prod-us-west-2-pipeline-analysis/bsmedberg/quantum-dataset/submission_date={}\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mformat\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mday\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstrftime\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"%Y%m%d\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m/usr/lib/spark/python/pyspark/sql/readwriter.py\u001b[0m in \u001b[0;36mparquet\u001b[0;34m(self, path, mode, partitionBy, compression)\u001b[0m\n\u001b[1;32m 639\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpartitionBy\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpartitionBy\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 640\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_set_opts\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcompression\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mcompression\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 641\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jwrite\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mparquet\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 642\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 643\u001b[0m \u001b[0;34m@\u001b[0m\u001b[0msince\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m1.6\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1129\u001b[0m \u001b[0mproto\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mEND_COMMAND_PART\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1130\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1131\u001b[0;31m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1132\u001b[0m return_value = get_return_value(\n\u001b[1;32m 1133\u001b[0m answer, self.gateway_client, self.target_id, self.name)\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36msend_command\u001b[0;34m(self, command, retry, binary)\u001b[0m\n\u001b[1;32m 881\u001b[0m \u001b[0mconnection\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_get_connection\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 882\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 883\u001b[0;31m \u001b[0mresponse\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mconnection\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 884\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mbinary\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 885\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresponse\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_create_connection_guard\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mconnection\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36msend_command\u001b[0;34m(self, command)\u001b[0m\n\u001b[1;32m 1026\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1027\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1028\u001b[0;31m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0msmart_decode\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstream\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mreadline\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m-\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1029\u001b[0m \u001b[0mlogger\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdebug\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"Answer received: {0}\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mformat\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0manswer\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1030\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0manswer\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mproto\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mRETURN_MESSAGE\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/mnt/anaconda2/lib/python2.7/socket.pyc\u001b[0m in \u001b[0;36mreadline\u001b[0;34m(self, size)\u001b[0m\n\u001b[1;32m 449\u001b[0m \u001b[0;32mwhile\u001b[0m \u001b[0mTrue\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 450\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 451\u001b[0;31m \u001b[0mdata\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_sock\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrecv\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_rbufsize\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 452\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0merror\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 453\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mEINTR\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mKeyboardInterrupt\u001b[0m: "
]
}
],
"source": [
"save_submission_date(date(2017, 5, 28))"
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[Row(submission_date=u'20170528'),\n",
" Row(submission_date=u'20170529'),\n",
" Row(submission_date=u'20170530'),\n",
" Row(submission_date=u'20170531'),\n",
" Row(submission_date=u'20170601'),\n",
" Row(submission_date=u'20170602'),\n",
" Row(submission_date=u'20170603'),\n",
" Row(submission_date=u'20170604'),\n",
" Row(submission_date=u'20170605'),\n",
" Row(submission_date=u'20170606'),\n",
" Row(submission_date=u'20170607'),\n",
" Row(submission_date=u'20170608')]"
]
},
"execution_count": 45,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ds = spark.read.parquet(\"s3a://net-mozaws-prod-us-west-2-pipeline-analysis/bsmedberg/quantum-dataset\")\n",
"ds.select(\"submission_date\").distinct().sort('submission_date').collect()"
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"{'nightly_all': {'ghost_windows_rate': 0.10860097114195245,\n",
" 'mtbf_chrome_input_latency_gt_250': 0.18816844876151392,\n",
" 'mtbf_chrome_input_latency_gt_2500': 8.292176231602825,\n",
" 'mtbf_content_input_latency_gt_250': 0.05705873974478752,\n",
" 'mtbf_content_input_latency_gt_2500': 2.1841823372745055,\n",
" 'total_subsessions': 499618},\n",
" 'nightly_quantumready': {'ghost_windows_rate': 0.09972158668026573,\n",
" 'mtbf_chrome_input_latency_gt_250': 0.18384453102367898,\n",
" 'mtbf_chrome_input_latency_gt_2500': 9.74757674923691,\n",
" 'mtbf_content_input_latency_gt_250': 0.05709728198916825,\n",
" 'mtbf_content_input_latency_gt_2500': 2.122584840905992,\n",
" 'total_subsessions': 181385},\n",
" 'quantum_readiness': {u'no_addons': 5295117,\n",
" u'no_other_theme': 280650,\n",
" u'yes': 3047229},\n",
" 'weekly_all': {'chrome_cc_gt_150': 0.4070703868103995,\n",
" 'chrome_gc_gt_150': 0.45179137603043756,\n",
" 'chrome_gccc_gt_150': 0.5604787571337984,\n",
" 'content_cc_gt_2500': 0.16374445149017122,\n",
" 'content_gc_gt_2500': 0.08928344958782498,\n",
" 'content_gccc_gt_2500': 0.18937856689917565,\n",
" 'total_users': 63080},\n",
" 'weekly_quantumready': {'chrome_cc_gt_150': 0.2852739601842115,\n",
" 'chrome_gc_gt_150': 0.3540994306849911,\n",
" 'chrome_gccc_gt_150': 0.4414185734488886,\n",
" 'content_cc_gt_2500': 0.1540776734235051,\n",
" 'content_gc_gt_2500': 0.08786307430104798,\n",
" 'content_gccc_gt_2500': 0.17869964100518548,\n",
" 'total_users': 27577}}"
]
},
"execution_count": 55,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"q = analyze_for_date(date(2017, 6, 5))\n",
"q"
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"s3.Object(bucket_name='telemetry-public-analysis-2', key='bsmedberg/daily-latency-metrics/20170608.json')"
]
},
"execution_count": 51,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"day=date(2017, 6, 8)\n",
"bucket = boto3.resource('s3').Bucket('telemetry-public-analysis-2')\n",
"bucket.put_object(Body=json.dumps(q),\n",
" Key='bsmedberg/daily-latency-metrics/{}.json'.format(day.strftime(\"%Y%m%d\")))"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"s3 = boto3.resource('s3')\n",
"my_bucket = s3.Bucket('net-mozaws-prod-us-west-2-pipeline-analysis')"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"files = list(my_bucket.objects.filter(Prefix=\"bsmedberg/quantum-dataset/submission_date=\"))"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def too_old(files):\n",
" for f in files:\n",
" if not f.key.startswith(\"bsmedberg/quantum-dataset/submission_date=\"):\n",
" continue\n",
" d = f.key[42:]\n",
" if d < \"20170605\":\n",
" yield f.key\n",
"\n",
"def delete_list(l):\n",
" my_bucket.delete_objects(Delete={'Objects': [{'Key': key} for key in l]})\n",
" \n",
"dlist = []\n",
"for key in too_old(files):\n",
" dlist.append(key)\n",
" if len(dlist) == 1000:\n",
" delete_list(dlist)\n",
" dlist = []\n",
"delete_list(dlist)"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [default]",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
# coding: utf-8
# In[1]:
import ujson as json
import pandas as pd
import numpy as np
from moztelemetry import get_pings_properties
from moztelemetry.dataset import Dataset
from moztelemetry.histogram import Histogram
from operator import add
from datetime import date, timedelta
get_ipython().magic(u'matplotlib inline')
# In[2]:
from collections import namedtuple
# In[3]:
import itertools
# In[4]:
import pyspark
import pyspark.sql.types as st
import pyspark.sql.functions as sf
# In[12]:
import boto3
# In[5]:
def ping_filter(p):
if p.get("environment/system/os/name", None) != "Windows_NT":
return False
if p.get("payload/info/subsessionLength", 0) <= 0:
return False
if p.get("environment/settings/e10sEnabled", False) != True:
return False
return True
# In[6]:
Ping = namedtuple(
"Ping",
(
"client_id",
"build_id",
"quantum_ready",
"chrome_input_latency_gt_250",
"chrome_input_latency_gt_2500",
"content_input_latency_gt_250",
"content_input_latency_gt_2500",
"chrome_gc_gt_150",
"chrome_cc_gt_150",
"content_gc_gt_2500",
"content_cc_gt_2500",
"ghost_windows",
"subsession_length",
)
)
ping_properties = [
"clientId",
"environment/build/buildId",
"environment/system/os/name",
"environment/settings/e10sEnabled",
"environment/addons/theme/id",
"environment/addons/activeAddons",
"payload/histograms/INPUT_EVENT_RESPONSE_COALESCED_MS",
"payload/histograms/CYCLE_COLLECTOR_MAX_PAUSE",
"payload/histograms/GC_MAX_PAUSE_MS_2",
"payload/histograms/GHOST_WINDOWS",
"payload/info/subsessionLength",
]
default_themes = (
'{972ce4c6-7e08-4474-a285-3208198ce6fd}',
'firefox-compact-light@mozilla.org',
'firefox-compact-dark@mozilla.org',
)
# In[7]:
def th(d, name, cutoff):
h = d[name]
if h is None:
return 0
return int(h.truncate(before=cutoff).sum())
def ping_mapper(d):
if not d["environment/settings/e10sEnabled"]:
quantum_ready = "no_e10s"
elif any(not(e.get("isSystem", False) or e.get("isWebExtension", False))
for e in (d["environment/addons/activeAddons"] or {}).itervalues()):
quantum_ready = "no_addons"
elif d["environment/addons/theme/id"] not in default_themes:
quantum_ready = "no_other_theme"
else:
quantum_ready = "yes"
return Ping(
d["clientId"],
d["environment/build/buildId"],
quantum_ready,
th(d, "payload/histograms/INPUT_EVENT_RESPONSE_COALESCED_MS_parent", 250),
th(d, "payload/histograms/INPUT_EVENT_RESPONSE_COALESCED_MS_parent", 2500),
th(d, "payload/histograms/INPUT_EVENT_RESPONSE_COALESCED_MS_children", 250),
th(d, "payload/histograms/INPUT_EVENT_RESPONSE_COALESCED_MS_children", 2500),
th(d, "payload/histograms/GC_MAX_PAUSE_MS_2_parent", 150),
th(d, "payload/histograms/CYCLE_COLLECTOR_MAX_PAUSE_parent", 150),
th(d, "payload/histograms/GC_MAX_PAUSE_MS_2_children", 2500),
th(d, "payload/histograms/CYCLE_COLLECTOR_MAX_PAUSE_children", 2500),
th(d, "payload/histograms/GHOST_WINDOWS", 1),
d["payload/info/subsessionLength"]
)
# In[8]:
ping_schema = st.StructType([
st.StructField("client_id", st.StringType()),
st.StructField("build_id", st.StringType()),
st.StructField("quantum_ready", st.StringType()),
st.StructField("chrome_input_latency_gt_250", st.IntegerType()),
st.StructField("chrome_input_latency_gt_2500", st.IntegerType()),
st.StructField("content_input_latency_gt_250", st.IntegerType()),
st.StructField("content_input_latency_gt_2500", st.IntegerType()),
st.StructField("chrome_gc_gt_150", st.IntegerType()),
st.StructField("chrome_cc_gt_150", st.IntegerType()),
st.StructField("content_gc_gt_2500", st.IntegerType()),
st.StructField("content_cc_gt_2500", st.IntegerType()),
st.StructField("ghost_windows", st.IntegerType()),
st.StructField("subsession_length", st.LongType()),
])
# In[34]:
def save_submission_date(day):
ds = Dataset.from_source("telemetry") .where(docType='main') .where(submissionDate=lambda d: d >= day.strftime('%Y%m%d')) .where(appUpdateChannel="nightly")
pings = ds.records(sc)
data = get_pings_properties(pings, ping_properties, with_processes=True) .filter(ping_filter).map(ping_mapper)
ds = spark.createDataFrame(data, ping_schema)
ds.write.mode("overwrite").parquet("s3a://net-mozaws-prod-us-west-2-pipeline-analysis/bsmedberg/quantum-dataset/submission_date={}".format(day.strftime("%Y%m%d")))
# In[52]:
def daily_stats(dataset):
summations = (
"subsession_length",
"chrome_input_latency_gt_250",
"chrome_input_latency_gt_2500",
"content_input_latency_gt_250",
"content_input_latency_gt_2500",
)
props = [sf.sum(dataset[n]).alias(n) for n in summations]
props.append(sf.count("*").alias("total_subsessions"))
props.append(sf.sum(sf.when(dataset.ghost_windows > 0, 1).otherwise(0)).alias("subsessions_with_ghost_windows"))
data = dataset.agg(*props).first()
hours = data.subsession_length / 60.0 / 60.0
return {
"total_subsessions": data.total_subsessions,
"ghost_windows_rate": 1.0 * data.subsessions_with_ghost_windows / data.total_subsessions,
"mtbf_chrome_input_latency_gt_250": hours / data.chrome_input_latency_gt_250,
"mtbf_chrome_input_latency_gt_2500": hours / data.chrome_input_latency_gt_2500,
"mtbf_content_input_latency_gt_250": hours / data.content_input_latency_gt_250,
"mtbf_content_input_latency_gt_2500": hours / data.content_input_latency_gt_2500,
}
def weekly_stats(dataset):
data = dataset.agg(
sf.count("*").alias("client_count"),
sf.sum(sf.when(dataset.chrome_gc_gt_150, 1).otherwise(0)).alias("chrome_gc_gt_150"),
sf.sum(sf.when(dataset.chrome_cc_gt_150, 1).otherwise(0)).alias("chrome_cc_gt_150"),
sf.sum(sf.when(dataset.chrome_gc_gt_150 | dataset.chrome_cc_gt_150, 1).otherwise(0)).alias("chrome_gccc_gt_150"),
sf.sum(sf.when(dataset.content_gc_gt_2500, 1).otherwise(0)).alias("content_gc_gt_2500"),
sf.sum(sf.when(dataset.content_cc_gt_2500, 1).otherwise(0)).alias("content_cc_gt_2500"),
sf.sum(sf.when(dataset.content_gc_gt_2500 | dataset.content_cc_gt_2500, 1).otherwise(0)).alias("content_gccc_gt_2500")
).first()
total = float(data.client_count)
return {
"total_users": data.client_count,
"chrome_gc_gt_150": data.chrome_gc_gt_150 / total,
"chrome_cc_gt_150": data.chrome_cc_gt_150 / total,
"chrome_gccc_gt_150": data.chrome_gccc_gt_150 / total,
"content_gc_gt_2500": data.content_gc_gt_2500 / total,
"content_cc_gt_2500": data.content_cc_gt_2500 / total,
"content_gccc_gt_2500": data.content_gccc_gt_2500 / total,
}
def analyze_for_date(day):
weekago = day - timedelta(days=7)
dataset = spark.read.parquet("s3n://net-mozaws-prod-us-west-2-pipeline-analysis/bsmedberg/quantum-dataset")
daily_data = dataset.where(dataset.submission_date == day.strftime("%Y%m%d")).cache()
weekly_data = dataset.where((dataset.submission_date > weekago.strftime("%Y%m%d")) & (dataset.submission_date <= day.strftime("%Y%m%d")))
grouped_by_client = weekly_data.groupBy('client_id').agg(
(sf.sum(sf.when(dataset.quantum_ready == "yes", 0).otherwise(1)) == 0).alias("quantum_ready"),
(sf.sum(dataset.chrome_gc_gt_150) > 0).alias("chrome_gc_gt_150"),
(sf.sum(dataset.chrome_cc_gt_150) > 0).alias("chrome_cc_gt_150"),
(sf.sum(dataset.content_gc_gt_2500) > 0).alias("content_gc_gt_2500"),
(sf.sum(dataset.content_cc_gt_2500) > 0).alias("content_cc_gt_2500")
).cache()
output_data = {
'nightly_all': daily_stats(daily_data),
'nightly_quantumready': daily_stats(daily_data.where(daily_data.quantum_ready == "yes")),
'quantum_readiness': dict(dataset.groupBy("quantum_ready").count().collect()),
'weekly_all': weekly_stats(grouped_by_client),
'weekly_quantumready': weekly_stats(grouped_by_client.where(grouped_by_client.quantum_ready)),
}
bucket = boto3.resource('s3').Bucket('telemetry-public-analysis-2')
bucket.put_object(Body=json.dumps(output_data),
Key='bsmedberg/daily-latency-metrics/{}.json'.format(day.strftime("%Y%m%d")))
return output_data
# In[44]:
save_submission_date(date(2017, 5, 28))
# In[45]:
ds = spark.read.parquet("s3a://net-mozaws-prod-us-west-2-pipeline-analysis/bsmedberg/quantum-dataset")
ds.select("submission_date").distinct().sort('submission_date').collect()
# In[55]:
q = analyze_for_date(date(2017, 6, 5))
q
# In[51]:
day=date(2017, 6, 8)
bucket = boto3.resource('s3').Bucket('telemetry-public-analysis-2')
bucket.put_object(Body=json.dumps(q),
Key='bsmedberg/daily-latency-metrics/{}.json'.format(day.strftime("%Y%m%d")))
# In[15]:
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('net-mozaws-prod-us-west-2-pipeline-analysis')
# In[30]:
files = list(my_bucket.objects.filter(Prefix="bsmedberg/quantum-dataset/submission_date="))
# In[29]:
def too_old(files):
for f in files:
if not f.key.startswith("bsmedberg/quantum-dataset/submission_date="):
continue
d = f.key[42:]
if d < "20170605":
yield f.key
def delete_list(l):
my_bucket.delete_objects(Delete={'Objects': [{'Key': key} for key in l]})
dlist = []
for key in too_old(files):
dlist.append(key)
if len(dlist) == 1000:
delete_list(dlist)
dlist = []
delete_list(dlist)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment