-
-
Save jtg567/9c048993a2710b5e6df73b2244de066e to your computer and use it in GitHub Desktop.
Race Cache 2 Ping Pull
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Populating the interactive namespace from numpy and matplotlib\n", | |
"\n", | |
"320\n", | |
"\n", | |
"date_range start: 20170726, date_range end: 20170802\n", | |
"\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"/mnt/anaconda2/lib/python2.7/site-packages/IPython/core/magics/pylab.py:161: UserWarning:\n", | |
"\n", | |
"pylab import has clobbered these variables: ['Annotation', 'Figure']\n", | |
"`%matplotlib` prevents importing * from pylab and numpy\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"import ujson as json\n", | |
"import matplotlib.pyplot as plt\n", | |
"import pandas as pd\n", | |
"import numpy as np\n", | |
"import plotly.plotly as py\n", | |
"import datetime as DT\n", | |
"from plotly.graph_objs import *\n", | |
"from moztelemetry import Dataset, get_pings_properties\n", | |
"from operator import itemgetter\n", | |
"from pprint import pprint as pp\n", | |
"\n", | |
"%pylab inline\n", | |
"pyplot.switch_backend('agg')\n", | |
"\n", | |
"# how many cores active on this cluster?\n", | |
"print \"\\n\"+str(sc.defaultParallelism)+\"\\n\"\n", | |
"\n", | |
"# specify exact date or range\n", | |
"date_range = ('20170726', '20170802') # exact strings\n", | |
"print \"date_range start: \"+date_range[0]+\", date_range end: \"+date_range[1]+\"\\n\"\n", | |
"\n", | |
"# sample rate \n", | |
"sr = 1" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"fetching 71019.90455MB in 16517 files...\n" | |
] | |
} | |
], | |
"source": [ | |
"cohorts = Dataset.from_source(\"telemetry-cohorts\")\n", | |
"cohorts.schema\n", | |
"\n", | |
"experiment = \"pref-flip-rcwn2-1381816\"\n", | |
"RDD = cohorts.where(\n", | |
" submissionDate= lambda sD: sD >= date_range[0] and sD <= date_range[1],\n", | |
" experimentId=experiment).records(sc, sample= sr)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"filtered = RDD.filter(lambda p: p['meta'].get('docType', \"None\") == 'main')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"props = dict(\n", | |
" clientId = \"clientId\",\n", | |
" channel= \"application/channel\",\n", | |
" FxVersion= \"application/version\",\n", | |
" osName= \"environment/system/os/name\",\n", | |
" osVersion= \"environment/system/os/version\",\n", | |
" branch= \"environment/experiments/pref-flip-rcwn2-1381816/branch\",\n", | |
" nrc_wn_oosd= \"payload/histograms/NETWORK_RACE_CACHE_WITH_NETWORK_OCEC_ON_START_DIFF\",\n", | |
" nrc_wn_st= \"payload/histograms/NETWORK_RACE_CACHE_WITH_NETWORK_SAVED_TIME\",\n", | |
" nrc_wn_u2= \"payload/histograms/NETWORK_RACE_CACHE_WITH_NETWORK_USAGE_2\",\n", | |
" nrc_bw_rnw= \"payload/histograms/NETWORK_RACE_CACHE_BANDWIDTH_RACE_NETWORK_WIN\",\n", | |
" nrc_bw_rcw= \"payload/histograms/NETWORK_RACE_CACHE_BANDWIDTH_RACE_CACHE_WIN\",\n", | |
" nrc_bw_nr= \"payload/histograms/NETWORK_RACE_CACHE_BANDWIDTH_NOT_RACE\",\n", | |
" nrc_val= \"payload/histograms/NETWORK_RACE_CACHE_VALIDATION\",\n", | |
" hpc_l1= \"payload/histograms/HTTP_PAGE_COMPLETE_LOAD\",\n", | |
" hpc_l2= \"payload/histograms/HTTP_PAGE_COMPLETE_LOAD_V2\",\n", | |
" hpc_ln= \"payload/histograms/HTTP_PAGE_COMPLETE_LOAD_NET_V2\",\n", | |
" hpc_lc= \"payload/histograms/HTTP_PAGE_COMPLETE_LOAD_CACHED_V2\",\n", | |
" hsc_l1= \"payload/histograms/HTTP_SUB_COMPLETE_LOAD\",\n", | |
" hsc_l2= \"payload/histograms/HTTP_SUB_COMPLETE_LOAD_V2\",\n", | |
" hsc_lc= \"payload/histograms/HTTP_SUB_COMPLETE_LOAD_CACHED_V2\",\n", | |
" hsc_ln= \"payload/histograms/HTTP_SUB_COMPLETE_LOAD_NET_V2\",\n", | |
" tcplt= \"payload/histograms/TOTAL_CONTENT_PAGE_LOAD_TIME\",\n", | |
")\n", | |
"RDD_prop = get_pings_properties(filtered, props)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"ename": "Py4JJavaError", | |
"evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 324, ip-172-31-29-77.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/serializers.py\", line 263, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1306, in takeUpToNumLeft\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 150, in <lambda>\n additional_histograms)) \\\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 232, in _get_ping_properties\n additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 291, in _get_merged_histograms\n additional_histograms)]\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 264, in _get_ping_property\n additional_histograms=additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/histogram.py\", line 135, in __init__\n name, histograms_definition[re.sub(\"^STARTUP_\", \"\", proper_name)])\nKeyError: 'HTTP_SUB_COMPLETE_LOAD'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)\n\tat org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)\n\tat org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/serializers.py\", line 263, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1306, in takeUpToNumLeft\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 150, in <lambda>\n additional_histograms)) \\\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 232, in _get_ping_properties\n additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 291, in _get_merged_histograms\n additional_histograms)]\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 264, in _get_ping_property\n additional_histograms=additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/histogram.py\", line 135, in __init__\n name, histograms_definition[re.sub(\"^STARTUP_\", \"\", proper_name)])\nKeyError: 'HTTP_SUB_COMPLETE_LOAD'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n", | |
"output_type": "error", | |
"traceback": [ | |
"\u001b[0;31m\u001b[0m", | |
"\u001b[0;31mPy4JJavaError\u001b[0mTraceback (most recent call last)", | |
"\u001b[0;32m<ipython-input-5-675113c54db5>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mRDD_prop\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtake\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", | |
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mtake\u001b[0;34m(self, num)\u001b[0m\n\u001b[1;32m 1308\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1309\u001b[0m \u001b[0mp\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mrange\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpartsScanned\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmin\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpartsScanned\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0mnumPartsToTry\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtotalParts\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1310\u001b[0;31m \u001b[0mres\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrunJob\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtakeUpToNumLeft\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mp\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1311\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1312\u001b[0m \u001b[0mitems\u001b[0m \u001b[0;34m+=\u001b[0m \u001b[0mres\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
"\u001b[0;32m/usr/lib/spark/python/pyspark/context.py\u001b[0m in \u001b[0;36mrunJob\u001b[0;34m(self, rdd, partitionFunc, partitions, allowLocal)\u001b[0m\n\u001b[1;32m 931\u001b[0m \u001b[0;31m# SparkContext#runJob.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 932\u001b[0m \u001b[0mmappedRDD\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mrdd\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpartitionFunc\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 933\u001b[0;31m \u001b[0mport\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrunJob\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jsc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmappedRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mpartitions\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 934\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mport\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmappedRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 935\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", | |
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1131\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1132\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1133\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1134\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1135\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
"\u001b[0;32m/usr/lib/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 61\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 65\u001b[0m \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 317\u001b[0m raise Py4JJavaError(\n\u001b[1;32m 318\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 319\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 320\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 321\u001b[0m raise Py4JError(\n", | |
"\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 324, ip-172-31-29-77.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/serializers.py\", line 263, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1306, in takeUpToNumLeft\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 150, in <lambda>\n additional_histograms)) \\\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 232, in _get_ping_properties\n additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 291, in _get_merged_histograms\n additional_histograms)]\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 264, in _get_ping_property\n additional_histograms=additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/histogram.py\", line 135, in __init__\n name, histograms_definition[re.sub(\"^STARTUP_\", \"\", proper_name)])\nKeyError: 'HTTP_SUB_COMPLETE_LOAD'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)\n\tat org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)\n\tat org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1503591685257_0006/container_1503591685257_0006_01_000012/pyspark.zip/pyspark/serializers.py\", line 263, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1306, in takeUpToNumLeft\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 150, in <lambda>\n additional_histograms)) \\\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 232, in _get_ping_properties\n additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 291, in _get_merged_histograms\n additional_histograms)]\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/spark.py\", line 264, in _get_ping_property\n additional_histograms=additional_histograms)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/histogram.py\", line 135, in __init__\n name, histograms_definition[re.sub(\"^STARTUP_\", \"\", proper_name)])\nKeyError: 'HTTP_SUB_COMPLETE_LOAD'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n" | |
] | |
} | |
], | |
"source": [ | |
"RDD_prop.take(1)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"byClient = RDD_prop.map(lambda p: (p['clientId'], [p])).reduceByKey(lambda x,y: x+y)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"def gather(pinglist, prop):\n", | |
" return [x[prop] for x in pinglist]\n", | |
"\n", | |
"#def hist\n", | |
"\n", | |
"def doIT(p):\n", | |
" clientId, pinglist = p \n", | |
" return dict(\n", | |
" clientId = clientId,\n", | |
" branch= np.unique(gather(pinglist, 'branch')),\n", | |
" pings= len(pinglist),\n", | |
" channel= np.unique(gather(pinglist, 'channel')),\n", | |
" os= np.unique(gather(pinglist, 'osName')) + np.unique(gather(pinglist, 'osVersion')),\n", | |
" )\n", | |
"fin = byClient.map(doIT) " | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"pd.DataFrame(fin.collect()).to_csv(experiment +\".csv\", index=False)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"RDD_prop.map(lambda x: ','.join(x)).repartition(sc.defaultParallelism).saveAsTextFile(\"s3://telemetry-private-analysis-2/jgaunt/\"+experiment+\".csv\")\n", | |
"print \"file saved at: s3://telemetry-private-analysis-2/jgaunt/\"+experiment+\".csv\"" | |
] | |
} | |
], | |
"metadata": { | |
"anaconda-cloud": {}, | |
"kernelspec": { | |
"display_name": "Python [conda root]", | |
"language": "python", | |
"name": "conda-root-py" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 2 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython2", | |
"version": "2.7.12" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 1 | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# In[1]: | |
import ujson as json | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import numpy as np | |
import plotly.plotly as py | |
import datetime as DT | |
from plotly.graph_objs import * | |
from moztelemetry import Dataset, get_pings_properties | |
from operator import itemgetter | |
from pprint import pprint as pp | |
get_ipython().magic(u'pylab inline') | |
pyplot.switch_backend('agg') | |
# how many cores active on this cluster? | |
print "\n"+str(sc.defaultParallelism)+"\n" | |
# specify exact date or range | |
date_range = ('20170726', '20170802') # exact strings | |
print "date_range start: "+date_range[0]+", date_range end: "+date_range[1]+"\n" | |
# sample rate | |
sr = 1 | |
# In[2]: | |
cohorts = Dataset.from_source("telemetry-cohorts") | |
cohorts.schema | |
experiment = "pref-flip-rcwn2-1381816" | |
RDD = cohorts.where( | |
submissionDate= lambda sD: sD >= date_range[0] and sD <= date_range[1], | |
experimentId=experiment).records(sc, sample= sr) | |
# In[3]: | |
filtered = RDD.filter(lambda p: p['meta'].get('docType', "None") == 'main') | |
# In[4]: | |
props = dict( | |
clientId = "clientId", | |
channel= "application/channel", | |
FxVersion= "application/version", | |
osName= "environment/system/os/name", | |
osVersion= "environment/system/os/version", | |
branch= "environment/experiments/pref-flip-rcwn2-1381816/branch", | |
nrc_wn_oosd= "payload/histograms/NETWORK_RACE_CACHE_WITH_NETWORK_OCEC_ON_START_DIFF", | |
nrc_wn_st= "payload/histograms/NETWORK_RACE_CACHE_WITH_NETWORK_SAVED_TIME", | |
nrc_wn_u2= "payload/histograms/NETWORK_RACE_CACHE_WITH_NETWORK_USAGE_2", | |
nrc_bw_rnw= "payload/histograms/NETWORK_RACE_CACHE_BANDWIDTH_RACE_NETWORK_WIN", | |
nrc_bw_rcw= "payload/histograms/NETWORK_RACE_CACHE_BANDWIDTH_RACE_CACHE_WIN", | |
nrc_bw_nr= "payload/histograms/NETWORK_RACE_CACHE_BANDWIDTH_NOT_RACE", | |
nrc_val= "payload/histograms/NETWORK_RACE_CACHE_VALIDATION", | |
hpc_l1= "payload/histograms/HTTP_PAGE_COMPLETE_LOAD", | |
hpc_l2= "payload/histograms/HTTP_PAGE_COMPLETE_LOAD_V2", | |
hpc_ln= "payload/histograms/HTTP_PAGE_COMPLETE_LOAD_NET_V2", | |
hpc_lc= "payload/histograms/HTTP_PAGE_COMPLETE_LOAD_CACHED_V2", | |
hsc_l1= "payload/histograms/HTTP_SUB_COMPLETE_LOAD", | |
hsc_l2= "payload/histograms/HTTP_SUB_COMPLETE_LOAD_V2", | |
hsc_lc= "payload/histograms/HTTP_SUB_COMPLETE_LOAD_CACHED_V2", | |
hsc_ln= "payload/histograms/HTTP_SUB_COMPLETE_LOAD_NET_V2", | |
tcplt= "payload/histograms/TOTAL_CONTENT_PAGE_LOAD_TIME", | |
) | |
RDD_prop = get_pings_properties(filtered, props) | |
# In[5]: | |
RDD_prop.take(1) | |
# In[ ]: | |
byClient = RDD_prop.map(lambda p: (p['clientId'], [p])).reduceByKey(lambda x,y: x+y) | |
# In[ ]: | |
def gather(pinglist, prop): | |
return [x[prop] for x in pinglist] | |
#def hist | |
def doIT(p): | |
clientId, pinglist = p | |
return dict( | |
clientId = clientId, | |
branch= np.unique(gather(pinglist, 'branch')), | |
pings= len(pinglist), | |
channel= np.unique(gather(pinglist, 'channel')), | |
os= np.unique(gather(pinglist, 'osName')) + np.unique(gather(pinglist, 'osVersion')), | |
) | |
fin = byClient.map(doIT) | |
# In[ ]: | |
pd.DataFrame(fin.collect()).to_csv(experiment +".csv", index=False) | |
# In[ ]: | |
RDD_prop.map(lambda x: ','.join(x)).repartition(sc.defaultParallelism).saveAsTextFile("s3://telemetry-private-analysis-2/jgaunt/"+experiment+".csv") | |
print "file saved at: s3://telemetry-private-analysis-2/jgaunt/"+experiment+".csv" | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment