Skip to content

Instantly share code, notes, and snippets.

@saptarshiguha
Created July 20, 2015 19:46
Show Gist options
  • Save saptarshiguha/de51f79976d2727a524c to your computer and use it in GitHub Desktop.
Save saptarshiguha/de51f79976d2727a524c to your computer and use it in GitHub Desktop.
{"nbformat_minor": 0, "cells": [{"source": "##Hello's Hello World\nA brief attempt to get data for LOOP_SHARING_STATE_CHANGE and LOOP_TWO_WAY_MEDIA_CONN_LENGTH. See this bug] for more details. You need to go to http://telemetry-dash.mozilla.org/, sign in with Persona and create a Spark Cluster. It will take time to start, but the landing page will have details on how to get to this notebook.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 1, "cell_type": "code", "source": "import ujson as json\nimport matplotlib.pyplot as plt\nimport pandas as pd\nimport numpy as np\nimport plotly.plotly as py\nfrom moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client, get_clients_history\n%pylab inline", "outputs": [{"output_type": "stream", "name": "stdout", "text": "Populating the interactive namespace from numpy and matplotlib\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "## Histograms: LOOP_TWO_WAY_MEDIA_CONN_LENGTH_1 and LOOP_SHARING_STATE_CHANGE_1\nLet's fetch some pings first for LOOP_TWO_WAY_MEDIA_CONN_LENGTH_1 and LOOP_SHARING_STATE_CHANGE_1 (ideally I would have a build-id range, but not sure who to ask right now). The API returns two histograms for each submission and each of the above:\nthe aggregate of the parent and child histograms", "cell_type": "markdown", "metadata": {}}, {"execution_count": 14, "cell_type": "code", "source": "pings = get_pings(sc, app=\"Firefox\", channel=\"release\", version=\"39.0\",submission_date=(\"20150630\", \"20150720\"), fraction=0.1, schema=\"v2\")\nhistograms = get_pings_properties(pings,[\"payload/histograms/LOOP_SHARING_STATE_CHANGE_1\", \"payload/histograms/LOOP_TWO_WAY_MEDIA_CONN_LENGTH_1\"], with_processes=False)", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 5, "cell_type": "code", "source": "help(get_pings)\n", "outputs": [{"output_type": "stream", "name": "stdout", "text": "Help on function get_pings in module moztelemetry.spark:\n\nget_pings(sc, **kwargs)\n Returns a RDD of Telemetry submissions for a given filtering criteria.\n \n Depending on the value of the 'schema' argument, different filtering criteria\n are available. By default, the 'v2' schema is assumed (classic Telemetry).\n The unified Telemetry/FHR submissions are available by selecting the 'v4' schema.\n \n If schema == \"v2\" then:\n :param app: an application name, e.g.: \"Firefox\"\n :param channel: a channel name, e.g.: \"nightly\"\n :param version: the application version, e.g.: \"40.0a1\"\n :param build_id: a build_id or a range of build_ids, e.g.:\n \"20150601000000\" or (\"20150601000000\", \"20150610999999\")\n :param submission_date: a submission date or a range of submission dates, e.g:\n \"20150601\" or (\"20150601\", \"20150610\")\n :param fraction: the fraction of pings to return, set to 1.0 by default\n :param reason: submission reason, set to \"saved_session\" by default, e.g: \"saved_session\"\n \n If schema == \"v4\" then:\n :param app: an application name, e.g.: \"Firefox\"\n :param channel: a channel name, e.g.: \"nightly\"\n :param version: the application version, e.g.: \"40.0a1\"\n :param build_id: a build_id or a range of build_ids, e.g.:\n \"20150601000000\" or (\"20150601000000\", \"20150610999999\")\n :param submission_date: a submission date or a range of submission dates, e.g:\n \"20150601\" or (\"20150601\", \"20150610\")\n :param source_name: source name, set to \"telemetry\" by default\n :param source_version: source version, set to \"4\" by default\n :param doc_type: ping type, set to \"saved_session\" by default\n :param fraction: the fraction of pings to return, set to 1.0 by default\n\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 11, "cell_type": "code", "source": "histograms.take(5)", "outputs": [{"execution_count": 11, "output_type": "execute_result", "data": {"text/plain": "[{'payload/histograms/LOOP_SHARING_STATE_CHANGE_1': None,\n 'payload/histograms/LOOP_TWO_WAY_MEDIA_CONN_LENGTH_1': None},\n {'payload/histograms/LOOP_SHARING_STATE_CHANGE_1': None,\n 'payload/histograms/LOOP_TWO_WAY_MEDIA_CONN_LENGTH_1': None},\n {'payload/histograms/LOOP_SHARING_STATE_CHANGE_1': None,\n 'payload/histograms/LOOP_TWO_WAY_MEDIA_CONN_LENGTH_1': None},\n {'payload/histograms/LOOP_SHARING_STATE_CHANGE_1': None,\n 'payload/histograms/LOOP_TWO_WAY_MEDIA_CONN_LENGTH_1': None},\n {'payload/histograms/LOOP_SHARING_STATE_CHANGE_1': None,\n 'payload/histograms/LOOP_TWO_WAY_MEDIA_CONN_LENGTH_1': None}]"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "A lot of Nones! Let's get a sample of some non Nones. Are they many non None packets? Not realy ...", "cell_type": "markdown", "metadata": {}}, {"execution_count": 15, "cell_type": "code", "source": "nonone1 = histograms.filter(lambda p: p[\"payload/histograms/LOOP_TWO_WAY_MEDIA_CONN_LENGTH_1\"] is not None)\nnonone2 = histograms.filter(lambda p: p[\"payload/histograms/LOOP_SHARING_STATE_CHANGE_1\"] is not None)", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 16, "cell_type": "code", "source": "print(nonone1.count())\nprint(nonone2.count())", "outputs": [{"ename": "Py4JJavaError", "evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 53 in stage 15.0 failed 4 times, most recent failure: Lost task 53.3 in stage 15.0 (TID 685932, ip-10-42-131-175.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/10/spark-assembly-1.3.1-hadoop2.4.0.jar/pyspark/worker.py\", line 101, in main\n process()\n File \"/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/10/spark-assembly-1.3.1-hadoop2.4.0.jar/pyspark/worker.py\", line 96, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/home/hadoop/spark/python/pyspark/rdd.py\", line 2252, in pipeline_func\n return func(split, prev_func(split, iterator))\n File \"/home/hadoop/spark/python/pyspark/rdd.py\", line 2252, in pipeline_func\n return func(split, prev_func(split, iterator))\n File \"/home/hadoop/spark/python/pyspark/rdd.py\", line 2252, in pipeline_func\n return func(split, prev_func(split, iterator))\n File \"/home/hadoop/spark/python/pyspark/rdd.py\", line 282, in func\n return f(iterator)\n File \"/home/hadoop/spark/python/pyspark/rdd.py\", line 932, in <lambda>\n return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()\n File \"/home/hadoop/spark/python/pyspark/rdd.py\", line 932, in <genexpr>\n return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()\n File \"/home/hadoop/anaconda/lib/python2.7/site-packages/moztelemetry/spark.py\", line 123, in <lambda>\n return pings.map(lambda p: _get_ping_properties(p, paths, only_median, with_processes)).filter(lambda p: p)\n File \"/home/hadoop/anaconda/lib/python2.7/site-packages/moztelemetry/spark.py\", line 303, in _get_ping_properties\n cursor = ping[\"payload\"]\nKeyError: 'payload'\n\n\tat org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)\n\tat org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:244)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:64)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n", "traceback": ["\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", "\u001b[1;32m<ipython-input-16-dff754c04009>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[1;32mprint\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mnonone1\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcount\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 2\u001b[0m \u001b[1;32mprint\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mnonone2\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcount\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcount\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 930\u001b[0m \u001b[1;36m3\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 931\u001b[0m \"\"\"\n\u001b[1;32m--> 932\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;32mlambda\u001b[0m \u001b[0mi\u001b[0m\u001b[1;33m:\u001b[0m \u001b[1;33m[\u001b[0m\u001b[0msum\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m1\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0m_\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mi\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msum\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 933\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 934\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mstats\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36msum\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 921\u001b[0m \u001b[1;36m6.0\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 922\u001b[0m \"\"\"\n\u001b[1;32m--> 923\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[1;33m:\u001b[0m \u001b[1;33m[\u001b[0m\u001b[0msum\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mx\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mreduce\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0moperator\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0madd\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 924\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 925\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mcount\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mreduce\u001b[1;34m(self, f)\u001b[0m\n\u001b[0;32m 737\u001b[0m \u001b[1;32myield\u001b[0m \u001b[0mreduce\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mf\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0miterator\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0minitial\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 738\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 739\u001b[1;33m \u001b[0mvals\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 740\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mvals\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 741\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mreduce\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mf\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mvals\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcollect\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 711\u001b[0m \"\"\"\n\u001b[0;32m 712\u001b[0m \u001b[1;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 713\u001b[1;33m \u001b[0mport\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcollectAndServe\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 714\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mport\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 715\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc\u001b[0m in \u001b[0;36m__call__\u001b[1;34m(self, *args)\u001b[0m\n\u001b[0;32m 536\u001b[0m \u001b[0manswer\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 537\u001b[0m return_value = get_return_value(answer, self.gateway_client,\n\u001b[1;32m--> 538\u001b[1;33m self.target_id, self.name)\n\u001b[0m\u001b[0;32m 539\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 540\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc\u001b[0m in \u001b[0;36mget_return_value\u001b[1;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[0;32m 298\u001b[0m raise Py4JJavaError(\n\u001b[0;32m 299\u001b[0m \u001b[1;34m'An error occurred while calling {0}{1}{2}.\\n'\u001b[0m\u001b[1;33m.\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 300\u001b[1;33m format(target_id, '.', name), value)\n\u001b[0m\u001b[0;32m 301\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 302\u001b[0m raise Py4JError(\n", "\u001b[1;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 53 in stage 15.0 failed 4 times, most recent failure: Lost task 53.3 in stage 15.0 (TID 685932, ip-10-42-131-175.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/10/spark-assembly-1.3.1-hadoop2.4.0.jar/pyspark/worker.py\", line 101, in main\n process()\n File \"/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/10/spark-assembly-1.3.1-hadoop2.4.0.jar/pyspark/worker.py\", line 96, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/home/hadoop/spark/python/pyspark/rdd.py\", line 2252, in pipeline_func\n return func(split, prev_func(split, iterator))\n File \"/home/hadoop/spark/python/pyspark/rdd.py\", line 2252, in pipeline_func\n return func(split, prev_func(split, iterator))\n File \"/home/hadoop/spark/python/pyspark/rdd.py\", line 2252, in pipeline_func\n return func(split, prev_func(split, iterator))\n File \"/home/hadoop/spark/python/pyspark/rdd.py\", line 282, in func\n return f(iterator)\n File \"/home/hadoop/spark/python/pyspark/rdd.py\", line 932, in <lambda>\n return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()\n File \"/home/hadoop/spark/python/pyspark/rdd.py\", line 932, in <genexpr>\n return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()\n File \"/home/hadoop/anaconda/lib/python2.7/site-packages/moztelemetry/spark.py\", line 123, in <lambda>\n return pings.map(lambda p: _get_ping_properties(p, paths, only_median, with_processes)).filter(lambda p: p)\n File \"/home/hadoop/anaconda/lib/python2.7/site-packages/moztelemetry/spark.py\", line 303, in _get_ping_properties\n cursor = ping[\"payload\"]\nKeyError: 'payload'\n\n\tat org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)\n\tat org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:244)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:64)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n"], "output_type": "error"}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "There are only 512 submissions that have non missing LOOP_TWO_WAY_MEDIA_CONN_LENGTH_1!!!", "cell_type": "markdown", "metadata": {}}, {"execution_count": null, "cell_type": "code", "source": "", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}], "nbformat": 4, "metadata": {"kernelspec": {"display_name": "Python 2", "name": "python2", "language": "python"}, "language_info": {"mimetype": "text/x-python", "nbconvert_exporter": "python", "version": "2.7.9", "name": "python", "file_extension": ".py", "pygments_lexer": "ipython2", "codemirror_mode": {"version": 2, "name": "ipython"}}}}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment