Skip to content

Instantly share code, notes, and snippets.

@chutten
Last active Feb 21, 2017
Embed
What would you like to do?
Telemetry Hello World
{
"cells": [
{
"cell_type": "raw",
"metadata": {},
"source": [
"---\n",
"title: \"Telemetry Hello World\"\n",
"authors:\n",
"- vitillo\n",
"tags:\n",
"- tutorial\n",
"- examples\n",
"- telemetry\n",
"- spark\n",
"created_at: 2016-03-10\n",
"updated_at: 2016-11-19\n",
"tldr: Brief introduction to Spark and Telemetry in Python\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Telemetry Hello World"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is a very a brief introduction to Spark and Telemetry in Python. You should have a look at the [tutorial](https://gist.github.com/vitillo/25a20b7c8685c0c82422) in Scala and the associated [talk](http://www.slideshare.net/RobertoAgostinoVitil/spark-meets-telemetry) if you are interested to learn more about Spark."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [],
"source": [
"import ujson as json\n",
"import matplotlib.pyplot as plt\n",
"import pandas as pd\n",
"import numpy as np\n",
"import plotly.plotly as py\n",
"\n",
"from plotly.graph_objs import *\n",
"from moztelemetry import get_pings_properties, get_one_ping_per_client\n",
"from moztelemetry.dataset import Dataset\n",
"\n",
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Basics"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The goal of this example is to plot the startup distribution for each OS. Let's see how many parallel workers we have at our disposal:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"sc.defaultParallelism"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's fetch 10% of Telemetry submissions for a given submission date..."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"Dataset.from_source(\"telemetry\").schema"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"pings = Dataset.from_source(\"telemetry\") \\\n",
" .where(docType='main') \\\n",
" .where(submissionDate=\"20161101\") \\\n",
" .where(appUpdateChannel=\"nightly\") \\\n",
" .records(sc, sample=0.1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"... and extract only the attributes we need from the Telemetry submissions:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"subset = get_pings_properties(pings, [\"clientId\",\n",
" \"environment/system/os/name\",\n",
" \"payload/simpleMeasurements/firstPaint\"])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's filter out submissions with an invalid startup time:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"subset = subset.filter(lambda p: p.get(\"payload/simpleMeasurements/firstPaint\", -1) >= 0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To prevent pseudoreplication, let's consider only a single submission for each client. As this step requires a distributed shuffle, it should always be run only after extracting the attributes of interest with *get_pings_properties*."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"subset = get_one_ping_per_client(subset)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Caching is fundamental as it allows for an iterative, real-time development workflow:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"cached = subset.cache()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"How many pings are we looking at?"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"cached.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's group the startup timings by OS:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"grouped = cached.map(lambda p: (p[\"environment/system/os/name\"], p[\"payload/simpleMeasurements/firstPaint\"])).groupByKey().collectAsMap()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And finally plot the data:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"frame = pd.DataFrame({x: np.log10(pd.Series(list(y))) for x, y in grouped.items()})\n",
"plt.figure(figsize=(17, 7))\n",
"frame.boxplot(return_type=\"axes\")\n",
"plt.ylabel(\"log10(firstPaint)\")\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can also create interactive plots with [plotly](https://plot.ly/):"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"fig = plt.figure(figsize=(18, 7))\n",
"frame[\"Windows_NT\"].plot(kind=\"hist\", bins=50)\n",
"plt.title(\"startup distribution for Windows\")\n",
"plt.ylabel(\"count\")\n",
"plt.xlabel(\"log10(firstPaint)\")\n",
"py.iplot_mpl(fig, strip_style=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Histograms"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's extract a histogram from the submissions:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"histograms = get_pings_properties(pings, \"payload/histograms/GC_MARK_MS\", with_processes=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The API returns three distinct histograms for each submission:\n",
"- a histogram for the parent process (*GC_MARK_MS_parent*)\n",
"- an aggregated histogram for the child processes (*GC_MARK_MS_children*)\n",
"- the aggregate of the parent and child histograms (*GC_MARK*)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's aggregate the histogram over all submissions and plot it:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def aggregate_arrays(xs, ys):\n",
" if xs is None:\n",
" return ys\n",
" \n",
" if ys is None:\n",
" return xs\n",
" \n",
" return xs + ys\n",
" \n",
"aggregate = histograms.map(lambda p: p[\"payload/histograms/GC_MARK_MS\"]).reduce(aggregate_arrays)\n",
"aggregate.plot(kind=\"bar\", figsize=(15, 7))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Keyed histograms follow a similar pattern. To extract a keyed histogram for which we know the key/label we are interested in:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"histograms = get_pings_properties(pings, \"payload/keyedHistograms/SUBPROCESS_ABNORMAL_ABORT/plugin\", with_processes=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"List all keys/labels for a keyed histogram:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"keys = pings.flatMap(lambda p: p[\"payload\"].get(\"keyedHistograms\", {}).get(\"MISBEHAVING_ADDONS_JANK_LEVEL\", {}).keys())\n",
"keys = keys.distinct().collect()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"keys[:5]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Retrieve the histograms for a set of labels:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"properties = map(lambda k: \"payload/keyedHistograms/{}/{}\".format(\"MISBEHAVING_ADDONS_JANK_LEVEL\", k), keys[:5])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"histograms = get_pings_properties(pings, properties, with_processes=True)"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [default]",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
# coding: utf-8
---
title: "Telemetry Hello World"
authors:
- vitillo
tags:
- tutorial
- examples
- telemetry
- spark
created_at: 2016-03-10
updated_at: 2016-11-19
tldr: Brief introduction to Spark and Telemetry in Python
---
# ### Telemetry Hello World
# This is a very a brief introduction to Spark and Telemetry in Python. You should have a look at the [tutorial](https://gist.github.com/vitillo/25a20b7c8685c0c82422) in Scala and the associated [talk](http://www.slideshare.net/RobertoAgostinoVitil/spark-meets-telemetry) if you are interested to learn more about Spark.
# In[ ]:
import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import plotly.plotly as py
from plotly.graph_objs import *
from moztelemetry import get_pings_properties, get_one_ping_per_client
from moztelemetry.dataset import Dataset
get_ipython().magic(u'matplotlib inline')
# ### Basics
# The goal of this example is to plot the startup distribution for each OS. Let's see how many parallel workers we have at our disposal:
# In[ ]:
sc.defaultParallelism
# Let's fetch 10% of Telemetry submissions for a given submission date...
# In[ ]:
Dataset.from_source("telemetry").schema
# In[ ]:
pings = Dataset.from_source("telemetry") .where(docType='main') .where(submissionDate="20161101") .where(appUpdateChannel="nightly") .records(sc, sample=0.1)
# ... and extract only the attributes we need from the Telemetry submissions:
# In[ ]:
subset = get_pings_properties(pings, ["clientId",
"environment/system/os/name",
"payload/simpleMeasurements/firstPaint"])
# Let's filter out submissions with an invalid startup time:
# In[ ]:
subset = subset.filter(lambda p: p.get("payload/simpleMeasurements/firstPaint", -1) >= 0)
# To prevent pseudoreplication, let's consider only a single submission for each client. As this step requires a distributed shuffle, it should always be run only after extracting the attributes of interest with *get_pings_properties*.
# In[ ]:
subset = get_one_ping_per_client(subset)
# Caching is fundamental as it allows for an iterative, real-time development workflow:
# In[ ]:
cached = subset.cache()
# How many pings are we looking at?
# In[ ]:
cached.count()
# Let's group the startup timings by OS:
# In[ ]:
grouped = cached.map(lambda p: (p["environment/system/os/name"], p["payload/simpleMeasurements/firstPaint"])).groupByKey().collectAsMap()
# And finally plot the data:
# In[ ]:
frame = pd.DataFrame({x: np.log10(pd.Series(list(y))) for x, y in grouped.items()})
plt.figure(figsize=(17, 7))
frame.boxplot(return_type="axes")
plt.ylabel("log10(firstPaint)")
plt.show()
# You can also create interactive plots with [plotly](https://plot.ly/):
# In[ ]:
fig = plt.figure(figsize=(18, 7))
frame["Windows_NT"].plot(kind="hist", bins=50)
plt.title("startup distribution for Windows")
plt.ylabel("count")
plt.xlabel("log10(firstPaint)")
py.iplot_mpl(fig, strip_style=True)
# ### Histograms
# Let's extract a histogram from the submissions:
# In[ ]:
histograms = get_pings_properties(pings, "payload/histograms/GC_MARK_MS", with_processes=True)
# The API returns three distinct histograms for each submission:
# - a histogram for the parent process (*GC_MARK_MS_parent*)
# - an aggregated histogram for the child processes (*GC_MARK_MS_children*)
# - the aggregate of the parent and child histograms (*GC_MARK*)
# Let's aggregate the histogram over all submissions and plot it:
# In[ ]:
def aggregate_arrays(xs, ys):
if xs is None:
return ys
if ys is None:
return xs
return xs + ys
aggregate = histograms.map(lambda p: p["payload/histograms/GC_MARK_MS"]).reduce(aggregate_arrays)
aggregate.plot(kind="bar", figsize=(15, 7))
# Keyed histograms follow a similar pattern. To extract a keyed histogram for which we know the key/label we are interested in:
# In[ ]:
histograms = get_pings_properties(pings, "payload/keyedHistograms/SUBPROCESS_ABNORMAL_ABORT/plugin", with_processes=True)
# List all keys/labels for a keyed histogram:
# In[ ]:
keys = pings.flatMap(lambda p: p["payload"].get("keyedHistograms", {}).get("MISBEHAVING_ADDONS_JANK_LEVEL", {}).keys())
keys = keys.distinct().collect()
# In[ ]:
keys[:5]
# Retrieve the histograms for a set of labels:
# In[ ]:
properties = map(lambda k: "payload/keyedHistograms/{}/{}".format("MISBEHAVING_ADDONS_JANK_LEVEL", k), keys[:5])
# In[ ]:
histograms = get_pings_properties(pings, properties, with_processes=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment