Skip to content

Instantly share code, notes, and snippets.

@jtg567
Last active August 24, 2017 20:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jtg567/9c048993a2710b5e6df73b2244de066e to your computer and use it in GitHub Desktop.
Save jtg567/9c048993a2710b5e6df73b2244de066e to your computer and use it in GitHub Desktop.
Race Cache 2 Ping Pull
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Populating the interactive namespace from numpy and matplotlib\n",
"\n",
"320\n",
"\n",
"date_range start: 20170726, date_range end: 20170802\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"/mnt/anaconda2/lib/python2.7/site-packages/IPython/core/magics/pylab.py:161: UserWarning:\n",
"\n",
"pylab import has clobbered these variables: ['Annotation', 'Figure']\n",
"`%matplotlib` prevents importing * from pylab and numpy\n",
"\n"
]
}
],
"source": [
"import ujson as json\n",
"import matplotlib.pyplot as plt\n",
"import pandas as pd\n",
"import numpy as np\n",
"import plotly.plotly as py\n",
"import datetime as DT\n",
"from plotly.graph_objs import *\n",
"from moztelemetry import Dataset, get_pings_properties\n",
"from operator import itemgetter\n",
"from pprint import pprint as pp\n",
"\n",
"%pylab inline\n",
"pyplot.switch_backend('agg')\n",
"\n",
"# how many cores active on this cluster?\n",
"print \"\\n\"+str(sc.defaultParallelism)+\"\\n\"\n",
"\n",
"# specify exact date or range\n",
"date_range = ('20170726', '20170802') # exact strings\n",
"print \"date_range start: \"+date_range[0]+\", date_range end: \"+date_range[1]+\"\\n\"\n",
"\n",
"# sample rate \n",
"sr = 1"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"fetching 71019.90455MB in 16517 files...\n"
]
}
],
"source": [
"cohorts = Dataset.from_source(\"telemetry-cohorts\")\n",
"cohorts.schema\n",
"\n",
"experiment = \"pref-flip-rcwn2-1381816\"\n",
"RDD = cohorts.where(\n",
" submissionDate= lambda sD: sD >= date_range[0] and sD <= date_range[1],\n",
" experimentId=experiment).records(sc, sample= sr)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"filtered = RDD.filter(lambda p: p['meta'].get('docType', \"None\") == 'main')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"props = dict(\n",
" clientId = \"clientId\",\n",
" channel= \"application/channel\",\n",
" FxVersion= \"application/version\",\n",
" osName= \"environment/system/os/name\",\n",
" osVersion= \"environment/system/os/version\",\n",
" branch= \"environment/experiments/pref-flip-rcwn2-1381816/branch\",\n",
" nrc_wn_oosd= \"payload/histograms/NETWORK_RACE_CACHE_WITH_NETWORK_OCEC_ON_START_DIFF\",\n",
" nrc_wn_st= \"payload/histograms/NETWORK_RACE_CACHE_WITH_NETWORK_SAVED_TIME\",\n",
" nrc_wn_u2= \"payload/histograms/NETWORK_RACE_CACHE_WITH_NETWORK_USAGE_2\",\n",
" nrc_bw_rnw= \"payload/histograms/NETWORK_RACE_CACHE_BANDWIDTH_RACE_NETWORK_WIN\",\n",
" nrc_bw_rcw= \"payload/histograms/NETWORK_RACE_CACHE_BANDWIDTH_RACE_CACHE_WIN\",\n",
" nrc_bw_nr= \"payload/histograms/NETWORK_RACE_CACHE_BANDWIDTH_NOT_RACE\",\n",
" nrc_val= \"payload/histograms/NETWORK_RACE_CACHE_VALIDATION\",\n",
" hpc_l1= \"payload/histograms/HTTP_PAGE_COMPLETE_LOAD\",\n",
" hpc_l2= \"payload/histograms/HTTP_PAGE_COMPLETE_LOAD_V2\",\n",
" hpc_ln= \"payload/histograms/HTTP_PAGE_COMPLETE_LOAD_NET_V2\",\n",
" hpc_lc= \"payload/histograms/HTTP_PAGE_COMPLETE_LOAD_CACHED_V2\",\n",
" hsc_l1= \"payload/histograms/HTTP_SUB_COMPLETE_LOAD\",\n",
" hsc_l2= \"payload/histograms/HTTP_SUB_COMPLETE_LOAD_V2\",\n",
" hsc_lc= \"payload/histograms/HTTP_SUB_COMPLETE_LOAD_CACHED_V2\",\n",
" hsc_ln= \"payload/histograms/HTTP_SUB_COMPLETE_LOAD_NET_V2\",\n",
" tcplt= \"payload/histograms/TOTAL_CONTENT_PAGE_LOAD_TIME\",\n",
")\n",
"RDD_prop = get_pings_properties(filtered, props)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"ename": "Py4JJavaError",
"evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 324, ip-172-31-29-77.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/serializers.py\", line 263, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1306, in takeUpToNumLeft\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 150, in <lambda>\n additional_histograms)) \\\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 232, in _get_ping_properties\n additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 291, in _get_merged_histograms\n additional_histograms)]\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 264, in _get_ping_property\n additional_histograms=additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/histogram.py\", line 135, in __init__\n name, histograms_definition[re.sub(\"^STARTUP_\", \"\", proper_name)])\nKeyError: 'HTTP_SUB_COMPLETE_LOAD'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)\n\tat org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)\n\tat org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/serializers.py\", line 263, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1306, in takeUpToNumLeft\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 150, in <lambda>\n additional_histograms)) \\\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 232, in _get_ping_properties\n additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 291, in _get_merged_histograms\n additional_histograms)]\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 264, in _get_ping_property\n additional_histograms=additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/histogram.py\", line 135, in __init__\n name, histograms_definition[re.sub(\"^STARTUP_\", \"\", proper_name)])\nKeyError: 'HTTP_SUB_COMPLETE_LOAD'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n",
"output_type": "error",
"traceback": [
"\u001b[0;31m\u001b[0m",
"\u001b[0;31mPy4JJavaError\u001b[0mTraceback (most recent call last)",
"\u001b[0;32m<ipython-input-5-675113c54db5>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mRDD_prop\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtake\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mtake\u001b[0;34m(self, num)\u001b[0m\n\u001b[1;32m 1308\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1309\u001b[0m \u001b[0mp\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mrange\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpartsScanned\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmin\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpartsScanned\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0mnumPartsToTry\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtotalParts\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1310\u001b[0;31m \u001b[0mres\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrunJob\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtakeUpToNumLeft\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mp\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1311\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1312\u001b[0m \u001b[0mitems\u001b[0m \u001b[0;34m+=\u001b[0m \u001b[0mres\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/context.py\u001b[0m in \u001b[0;36mrunJob\u001b[0;34m(self, rdd, partitionFunc, partitions, allowLocal)\u001b[0m\n\u001b[1;32m 931\u001b[0m \u001b[0;31m# SparkContext#runJob.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 932\u001b[0m \u001b[0mmappedRDD\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mrdd\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpartitionFunc\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 933\u001b[0;31m \u001b[0mport\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrunJob\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jsc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmappedRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mpartitions\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 934\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mport\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmappedRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 935\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1131\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1132\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1133\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1134\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1135\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 61\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 65\u001b[0m \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 317\u001b[0m raise Py4JJavaError(\n\u001b[1;32m 318\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 319\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 320\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 321\u001b[0m raise Py4JError(\n",
"\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 324, ip-172-31-29-77.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/serializers.py\", line 263, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1306, in takeUpToNumLeft\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 150, in <lambda>\n additional_histograms)) \\\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 232, in _get_ping_properties\n additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 291, in _get_merged_histograms\n additional_histograms)]\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 264, in _get_ping_property\n additional_histograms=additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/histogram.py\", line 135, in __init__\n name, histograms_definition[re.sub(\"^STARTUP_\", \"\", proper_name)])\nKeyError: 'HTTP_SUB_COMPLETE_LOAD'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)\n\tat org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)\n\tat org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/serializers.py\", line 263, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1306, in takeUpToNumLeft\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 150, in <lambda>\n additional_histograms)) \\\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 232, in _get_ping_properties\n additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 291, in _get_merged_histograms\n additional_histograms)]\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 264, in _get_ping_property\n additional_histograms=additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/histogram.py\", line 135, in __init__\n name, histograms_definition[re.sub(\"^STARTUP_\", \"\", proper_name)])\nKeyError: 'HTTP_SUB_COMPLETE_LOAD'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n"
]
}
],
"source": [
"RDD_prop.take(1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"byClient = RDD_prop.map(lambda p: (p['clientId'], [p])).reduceByKey(lambda x,y: x+y)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def gather(pinglist, prop):\n",
" return [x[prop] for x in pinglist]\n",
"\n",
"#def hist\n",
"\n",
"def doIT(p):\n",
" clientId, pinglist = p \n",
" return dict(\n",
" clientId = clientId,\n",
" branch= np.unique(gather(pinglist, 'branch')),\n",
" pings= len(pinglist),\n",
" channel= np.unique(gather(pinglist, 'channel')),\n",
" os= np.unique(gather(pinglist, 'osName')) + np.unique(gather(pinglist, 'osVersion')),\n",
" )\n",
"fin = byClient.map(doIT) "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"pd.DataFrame(fin.collect()).to_csv(experiment +\".csv\", index=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"RDD_prop.map(lambda x: ','.join(x)).repartition(sc.defaultParallelism).saveAsTextFile(\"s3://telemetry-private-analysis-2/jgaunt/\"+experiment+\".csv\")\n",
"print \"file saved at: s3://telemetry-private-analysis-2/jgaunt/\"+experiment+\".csv\""
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [conda root]",
"language": "python",
"name": "conda-root-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
# coding: utf-8
# In[1]:
import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import plotly.plotly as py
import datetime as DT
from plotly.graph_objs import *
from moztelemetry import Dataset, get_pings_properties
from operator import itemgetter
from pprint import pprint as pp
get_ipython().magic(u'pylab inline')
pyplot.switch_backend('agg')
# how many cores active on this cluster?
print "\n"+str(sc.defaultParallelism)+"\n"
# specify exact date or range
date_range = ('20170726', '20170802') # exact strings
print "date_range start: "+date_range[0]+", date_range end: "+date_range[1]+"\n"
# sample rate
sr = 1
# In[2]:
cohorts = Dataset.from_source("telemetry-cohorts")
cohorts.schema
experiment = "pref-flip-rcwn2-1381816"
RDD = cohorts.where(
submissionDate= lambda sD: sD >= date_range[0] and sD <= date_range[1],
experimentId=experiment).records(sc, sample= sr)
# In[3]:
filtered = RDD.filter(lambda p: p['meta'].get('docType', "None") == 'main')
# In[4]:
props = dict(
clientId = "clientId",
channel= "application/channel",
FxVersion= "application/version",
osName= "environment/system/os/name",
osVersion= "environment/system/os/version",
branch= "environment/experiments/pref-flip-rcwn2-1381816/branch",
nrc_wn_oosd= "payload/histograms/NETWORK_RACE_CACHE_WITH_NETWORK_OCEC_ON_START_DIFF",
nrc_wn_st= "payload/histograms/NETWORK_RACE_CACHE_WITH_NETWORK_SAVED_TIME",
nrc_wn_u2= "payload/histograms/NETWORK_RACE_CACHE_WITH_NETWORK_USAGE_2",
nrc_bw_rnw= "payload/histograms/NETWORK_RACE_CACHE_BANDWIDTH_RACE_NETWORK_WIN",
nrc_bw_rcw= "payload/histograms/NETWORK_RACE_CACHE_BANDWIDTH_RACE_CACHE_WIN",
nrc_bw_nr= "payload/histograms/NETWORK_RACE_CACHE_BANDWIDTH_NOT_RACE",
nrc_val= "payload/histograms/NETWORK_RACE_CACHE_VALIDATION",
hpc_l1= "payload/histograms/HTTP_PAGE_COMPLETE_LOAD",
hpc_l2= "payload/histograms/HTTP_PAGE_COMPLETE_LOAD_V2",
hpc_ln= "payload/histograms/HTTP_PAGE_COMPLETE_LOAD_NET_V2",
hpc_lc= "payload/histograms/HTTP_PAGE_COMPLETE_LOAD_CACHED_V2",
hsc_l1= "payload/histograms/HTTP_SUB_COMPLETE_LOAD",
hsc_l2= "payload/histograms/HTTP_SUB_COMPLETE_LOAD_V2",
hsc_lc= "payload/histograms/HTTP_SUB_COMPLETE_LOAD_CACHED_V2",
hsc_ln= "payload/histograms/HTTP_SUB_COMPLETE_LOAD_NET_V2",
tcplt= "payload/histograms/TOTAL_CONTENT_PAGE_LOAD_TIME",
)
RDD_prop = get_pings_properties(filtered, props)
# In[5]:
RDD_prop.take(1)
# In[ ]:
byClient = RDD_prop.map(lambda p: (p['clientId'], [p])).reduceByKey(lambda x,y: x+y)
# In[ ]:
def gather(pinglist, prop):
return [x[prop] for x in pinglist]
#def hist
def doIT(p):
clientId, pinglist = p
return dict(
clientId = clientId,
branch= np.unique(gather(pinglist, 'branch')),
pings= len(pinglist),
channel= np.unique(gather(pinglist, 'channel')),
os= np.unique(gather(pinglist, 'osName')) + np.unique(gather(pinglist, 'osVersion')),
)
fin = byClient.map(doIT)
# In[ ]:
pd.DataFrame(fin.collect()).to_csv(experiment +".csv", index=False)
# In[ ]:
RDD_prop.map(lambda x: ','.join(x)).repartition(sc.defaultParallelism).saveAsTextFile("s3://telemetry-private-analysis-2/jgaunt/"+experiment+".csv")
print "file saved at: s3://telemetry-private-analysis-2/jgaunt/"+experiment+".csv"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment