Skip to content

Instantly share code, notes, and snippets.

@bsmedberg
Created March 8, 2017 19:28
Show Gist options
  • Save bsmedberg/e60910c5c3793055524db04b1f5aae50 to your computer and use it in GitHub Desktop.
Save bsmedberg/e60910c5c3793055524db04b1f5aae50 to your computer and use it in GitHub Desktop.
Build Reversion
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "raw",
"metadata": {},
"source": [
"---\n",
"title: \"Longitudinal Dataset Tutorial\"\n",
"authors:\n",
"- vitillo\n",
"tags:\n",
"- tutorial\n",
"- examples\n",
"- dataset\n",
"- longitudinal\n",
"created_at: 2016-03-10\n",
"updated_at: 2016-06-24\n",
"tldr: Tutorial of how to use the Longitudinal Dataset\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Longitudinal Dataset Tutorial"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The longitudinal dataset is logically organized as a table where rows represent profiles and columns the various metrics (e.g. startup time). Each field of the table contains a list of values, one per Telemetry submission received for that profile.\n",
"\n",
"The dataset is going to be regenerated from scratch every week, this allows us to apply non backward compatible changes to the schema and not worry about merging procedures. \n",
"\n",
"The current version of the longitudinal dataset has been build with all main pings received from 1% of profiles across all channels after mid November, which is shortly after Unified Telemetry landed. Future version will store up to 180 days of data."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/mnt/anaconda2/lib/python2.7/site-packages/matplotlib/font_manager.py:273: UserWarning: Matplotlib is building the font cache using fc-list. This may take a moment.\n",
" warnings.warn('Matplotlib is building the font cache using fc-list. This may take a moment.')\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Populating the interactive namespace from numpy and matplotlib\n"
]
}
],
"source": [
"import matplotlib.pyplot as plt\n",
"import pandas as pd\n",
"import numpy as np\n",
"import plotly.plotly as py\n",
"\n",
"%pylab inline"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"32"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sc.defaultParallelism"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The longitudinal dataset can be accessed as a Spark [DataFrame](https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame), which is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"frame = sqlContext.sql(\"SELECT client_id, profile_subsession_counter, build, settings FROM longitudinal\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Number of profiles:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"6796454"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"frame.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The dataset contains all histograms but it doesn't yet include all metrics stored in the various sections of the pings. See the [code](https://github.com/vitillo/telemetry-batch-view/blob/longitudinal/src/main/scala/streams/Longitudinal.scala#L68) that generates the dataset for a complete list of supported metrics. More metrics are going to be included in future versions of the dataset, inclusion of specific metrics can be prioritized by filing a bug."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Scalar metrics"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A Spark bug is slowing down the *first* and *take* methods on a dataframe. A way around that for now is to first convert the dataframe to a rdd and then invoke *first* or *take*, e.g.:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"first = frame.rdd.first()\n",
"\n",
"#filter(\"normalized_channel = 'release'\")\\\n",
"# .select(\"build\",\n",
"# \"system\", \n",
"# \"gc_ms\",\n",
"# \"fxa_configured\",\n",
"# \"browser_set_default_always_check\",\n",
"# \"browser_set_default_dialog_prompt_rawcount\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As mentioned earlier on, each field of the dataframe is an array containing one value per submission per client. The submissions are chronologically sorted."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[Row(application_id=u'{ec8030f7-c20a-464f-9b0e-13a3a9e97384}', application_name=u'Firefox', architecture=u'x86', architectures_in_binary=None, build_id=u'20170125094131', version=u'51.0.1', vendor=u'Mozilla', platform_version=u'51.0.1', xpcom_abi=u'x86-msvc', hotfix_version=None),\n",
" Row(application_id=u'{ec8030f7-c20a-464f-9b0e-13a3a9e97384}', application_name=u'Firefox', architecture=u'x86', architectures_in_binary=None, build_id=u'20170125094131', version=u'51.0.1', vendor=u'Mozilla', platform_version=u'51.0.1', xpcom_abi=u'x86-msvc', hotfix_version=None),\n",
" Row(application_id=u'{ec8030f7-c20a-464f-9b0e-13a3a9e97384}', application_name=u'Firefox', architecture=u'x86', architectures_in_binary=None, build_id=u'20170125094131', version=u'51.0.1', vendor=u'Mozilla', platform_version=u'51.0.1', xpcom_abi=u'x86-msvc', hotfix_version=None),\n",
" Row(application_id=u'{ec8030f7-c20a-464f-9b0e-13a3a9e97384}', application_name=u'Firefox', architecture=u'x86', architectures_in_binary=None, build_id=u'20170125094131', version=u'51.0.1', vendor=u'Mozilla', platform_version=u'51.0.1', xpcom_abi=u'x86-msvc', hotfix_version=None)]"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"first.build"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def get_version(v):\n",
" try:\n",
" return int(v.split(\".\")[0])\n",
" except ValueError:\n",
" return None"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"None\n",
"12\n",
"12\n",
"None\n"
]
}
],
"source": [
"print get_version(\"\")\n",
"print get_version(\"12.3\")\n",
"print get_version(\"12\")\n",
"print get_version(\"abcd\")"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Traceback (most recent call last):\n",
" File \"/mnt/anaconda2/lib/python2.7/SocketServer.py\", line 290, in _handle_request_noblock\n",
"ERROR:root:Exception while sending command.\n",
"Traceback (most recent call last):\n",
" File \"/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\", line 883, in send_command\n",
" response = connection.send_command(command)\n",
" File \"/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\", line 1040, in send_command\n",
" \"Error while receiving\", e, proto.ERROR_ON_RECEIVE)\n",
"Py4JNetworkError: Error while receiving\n",
" self.process_request(request, client_address)\n",
" File \"/mnt/anaconda2/lib/python2.7/SocketServer.py\", line 318, in process_request\n",
" self.finish_request(request, client_address)\n",
" File \"/mnt/anaconda2/lib/python2.7/SocketServer.py\", line 331, in finish_request\n",
" self.RequestHandlerClass(request, client_address, self)\n",
" File \"/mnt/anaconda2/lib/python2.7/SocketServer.py\", line 652, in __init__\n",
" self.handle()\n",
" File \"/usr/lib/spark/python/pyspark/accumulators.py\", line 235, in handle\n",
" num_updates = read_int(self.rfile)\n",
" File \"/usr/lib/spark/python/pyspark/serializers.py\", line 545, in read_int\n",
" raise EOFError\n",
"EOFError\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:34979)\n",
"Traceback (most recent call last):\n",
" File \"/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\", line 963, in start\n",
" self.socket.connect((self.address, self.port))\n",
" File \"/mnt/anaconda2/lib/python2.7/socket.py\", line 228, in meth\n",
" return getattr(self._sock,name)(*args)\n",
"error: [Errno 111] Connection refused\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"----------------------------------------\n",
"Exception happened during processing of request from ('127.0.0.1', 58866)\n",
"----------------------------------------\n"
]
},
{
"ename": "Py4JNetworkError",
"evalue": "An error occurred while trying to connect to the Java server (127.0.0.1:34979)",
"output_type": "error",
"traceback": [
"\u001b[0;31m\u001b[0m",
"\u001b[0;31mPy4JNetworkError\u001b[0mTraceback (most recent call last)",
"\u001b[0;32m<ipython-input-26-5014964361d2>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[1;32m 36\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mchannel_switch\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 37\u001b[0m \u001b[0mcount_channelswitch\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 38\u001b[0;31m \u001b[0mframe\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mforeach\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmapper\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 39\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 40\u001b[0m \u001b[0mtotal\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfloat\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcount_total\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mvalue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mforeach\u001b[0;34m(self, f)\u001b[0m\n\u001b[1;32m 748\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 749\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0miter\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 750\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mprocessPartition\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;31m# Force evaluation\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 751\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 752\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mforeachPartition\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcount\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 1006\u001b[0m \u001b[0;36m3\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1007\u001b[0m \"\"\"\n\u001b[0;32m-> 1008\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mi\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m1\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0m_\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mi\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1009\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1010\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mstats\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36msum\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 997\u001b[0m \u001b[0;36m6.0\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 998\u001b[0m \"\"\"\n\u001b[0;32m--> 999\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfold\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0moperator\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1000\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1001\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mfold\u001b[0;34m(self, zeroValue, op)\u001b[0m\n\u001b[1;32m 871\u001b[0m \u001b[0;31m# zeroValue provided to each partition is unique from the one provided\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 872\u001b[0m \u001b[0;31m# to the final reduce call\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 873\u001b[0;31m \u001b[0mvals\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 874\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mreduce\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mvals\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mzeroValue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 875\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcollect\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 774\u001b[0m \"\"\"\n\u001b[1;32m 775\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 776\u001b[0;31m \u001b[0mport\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollectAndServe\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 777\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mport\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 778\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/traceback_utils.py\u001b[0m in \u001b[0;36m__exit__\u001b[0;34m(self, type, value, tb)\u001b[0m\n\u001b[1;32m 76\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_spark_stack_depth\u001b[0m \u001b[0;34m-=\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 77\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_spark_stack_depth\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;36m0\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 78\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_context\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jsc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msetCallSite\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1129\u001b[0m \u001b[0mproto\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mEND_COMMAND_PART\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1130\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1131\u001b[0;31m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1132\u001b[0m return_value = get_return_value(\n\u001b[1;32m 1133\u001b[0m answer, self.gateway_client, self.target_id, self.name)\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36msend_command\u001b[0;34m(self, command, retry, binary)\u001b[0m\n\u001b[1;32m 879\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;34m`\u001b[0m\u001b[0mbinary\u001b[0m\u001b[0;34m`\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;34m`\u001b[0m\u001b[0mTrue\u001b[0m\u001b[0;34m`\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 880\u001b[0m \"\"\"\n\u001b[0;32m--> 881\u001b[0;31m \u001b[0mconnection\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_get_connection\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 882\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 883\u001b[0m \u001b[0mresponse\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mconnection\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m_get_connection\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 827\u001b[0m \u001b[0mconnection\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdeque\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 828\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mIndexError\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 829\u001b[0;31m \u001b[0mconnection\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_create_connection\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 830\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mconnection\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 831\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m_create_connection\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 833\u001b[0m connection = GatewayConnection(\n\u001b[1;32m 834\u001b[0m self.gateway_parameters, self.gateway_property)\n\u001b[0;32m--> 835\u001b[0;31m \u001b[0mconnection\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstart\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 836\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mconnection\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 837\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36mstart\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 968\u001b[0m \u001b[0;34m\"server ({0}:{1})\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mformat\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0maddress\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mport\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 969\u001b[0m \u001b[0mlogger\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mexception\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmsg\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 970\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mPy4JNetworkError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmsg\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 971\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 972\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mclose\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mreset\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mFalse\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mPy4JNetworkError\u001b[0m: An error occurred while trying to connect to the Java server (127.0.0.1:34979)"
]
}
],
"source": [
"count_total = sc.accumulator(0)\n",
"count_backwardsversion = sc.accumulator(0)\n",
"count_channelswitch = sc.accumulator(0)\n",
"count_backwardsversion_releaseonly = sc.accumulator(0)\n",
"\n",
"def mapper(row):\n",
" channel_switch = False\n",
" release_only = True\n",
" backwardsversion = False\n",
"\n",
" # sessions are sorted by subsessionStartDate and then profileSubsessionCounter, newest-first.\n",
" last_version = 99\n",
" last_channel = None\n",
"\n",
" for settings in row.settings:\n",
" channel = settings.update.channel\n",
" if channel != \"release\":\n",
" release_only = False\n",
" if last_channel is None:\n",
" last_channel = channel\n",
" elif last_channel != channel:\n",
" channel_switch = True\n",
"\n",
" for build in row.build:\n",
" version = get_version(build.version)\n",
" if version is not None:\n",
" if version > last_version:\n",
" backwardsversion = True\n",
" last_version = version\n",
"\n",
" count_total.add(1)\n",
" if backwardsversion:\n",
" count_backwardsversion.add(1)\n",
" if release_only:\n",
" count_backwardsversion_releaseonly.add(1)\n",
" if channel_switch:\n",
" count_channelswitch.add(1)\n",
"frame.rdd.foreach(mapper)\n",
"\n",
"total = float(count_total.value)\n",
"print \"users that switched channels at all: {:.2f}%\".format(count_channelswitch.value / total * 100)\n",
"print \"users that reverted to an older version: {:.2f}%\".format(count_backwardsversion.value / total * 100)\n",
"print \"users that reverted to an older version, staying on the release channel: {:.2f}\".format(count_backwardsversion_releaseonly.value / total * 100)\n"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [default]",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
# coding: utf-8
---
title: "Longitudinal Dataset Tutorial"
authors:
- vitillo
tags:
- tutorial
- examples
- dataset
- longitudinal
created_at: 2016-03-10
updated_at: 2016-06-24
tldr: Tutorial of how to use the Longitudinal Dataset
---
# ### Longitudinal Dataset Tutorial
# The longitudinal dataset is logically organized as a table where rows represent profiles and columns the various metrics (e.g. startup time). Each field of the table contains a list of values, one per Telemetry submission received for that profile.
#
# The dataset is going to be regenerated from scratch every week, this allows us to apply non backward compatible changes to the schema and not worry about merging procedures.
#
# The current version of the longitudinal dataset has been build with all main pings received from 1% of profiles across all channels after mid November, which is shortly after Unified Telemetry landed. Future version will store up to 180 days of data.
# In[1]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import plotly.plotly as py
get_ipython().magic(u'pylab inline')
# In[2]:
sc.defaultParallelism
# The longitudinal dataset can be accessed as a Spark [DataFrame](https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame), which is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python.
# In[4]:
frame = sqlContext.sql("SELECT client_id, profile_subsession_counter, build, settings FROM longitudinal")
# Number of profiles:
# In[5]:
frame.count()
# The dataset contains all histograms but it doesn't yet include all metrics stored in the various sections of the pings. See the [code](https://github.com/vitillo/telemetry-batch-view/blob/longitudinal/src/main/scala/streams/Longitudinal.scala#L68) that generates the dataset for a complete list of supported metrics. More metrics are going to be included in future versions of the dataset, inclusion of specific metrics can be prioritized by filing a bug.
# ### Scalar metrics
# A Spark bug is slowing down the *first* and *take* methods on a dataframe. A way around that for now is to first convert the dataframe to a rdd and then invoke *first* or *take*, e.g.:
# In[6]:
first = frame.rdd.first()
#filter("normalized_channel = 'release'")\
# .select("build",
# "system",
# "gc_ms",
# "fxa_configured",
# "browser_set_default_always_check",
# "browser_set_default_dialog_prompt_rawcount")
# As mentioned earlier on, each field of the dataframe is an array containing one value per submission per client. The submissions are chronologically sorted.
# In[9]:
first.build
# In[12]:
def get_version(v):
try:
return int(v.split(".")[0])
except ValueError:
return None
# In[13]:
print get_version("")
print get_version("12.3")
print get_version("12")
print get_version("abcd")
# In[26]:
count_total = sc.accumulator(0)
count_backwardsversion = sc.accumulator(0)
count_channelswitch = sc.accumulator(0)
count_backwardsversion_releaseonly = sc.accumulator(0)
def mapper(row):
channel_switch = False
release_only = True
backwardsversion = False
# sessions are sorted by subsessionStartDate and then profileSubsessionCounter, newest-first.
last_version = 99
last_channel = None
for settings in row.settings:
channel = settings.update.channel
if channel != "release":
release_only = False
if last_channel is None:
last_channel = channel
elif last_channel != channel:
channel_switch = True
for build in row.build:
version = get_version(build.version)
if version is not None:
if version > last_version:
backwardsversion = True
last_version = version
count_total.add(1)
if backwardsversion:
count_backwardsversion.add(1)
if release_only:
count_backwardsversion_releaseonly.add(1)
if channel_switch:
count_channelswitch.add(1)
frame.rdd.foreach(mapper)
total = float(count_total.value)
print "users that switched channels at all: {:.2f}%".format(count_channelswitch.value / total * 100)
print "users that reverted to an older version: {:.2f}%".format(count_backwardsversion.value / total * 100)
print "users that reverted to an older version, staying on the release channel: {:.2f}".format(count_backwardsversion_releaseonly.value / total * 100)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment