Skip to content

Instantly share code, notes, and snippets.

@maurodoglio
Created March 15, 2017 14:51
Show Gist options
  • Save maurodoglio/42bb80125ffc1003d5243cdd463936a3 to your computer and use it in GitHub Desktop.
Save maurodoglio/42bb80125ffc1003d5243cdd463936a3 to your computer and use it in GitHub Desktop.
dataset error
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Unable to parse whitelist (/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/histogram-whitelists.json). Assuming all histograms are acceptable.\n",
"Populating the interactive namespace from numpy and matplotlib\n",
"Cloning into 'telemetry_utils'...\n",
"remote: Counting objects: 64, done.\u001b[K\n",
"remote: Total 64 (delta 0), reused 0 (delta 0), pack-reused 64\u001b[K\n",
"Unpacking objects: 100% (64/64), done.\n",
"Checking connectivity... done.\n"
]
}
],
"source": [
"import ujson as json\n",
"import matplotlib.pyplot as plt\n",
"import pandas as pd\n",
"import numpy as np\n",
"import plotly.plotly as py\n",
"import IPython\n",
"\n",
"from __future__ import division\n",
"from moztelemetry.spark import get_one_ping_per_client, get_pings_properties\n",
"from moztelemetry import Dataset\n",
"from montecarlino import grouped_permutation_test\n",
"\n",
"%pylab inline\n",
"IPython.core.pylabtools.figsize(16, 7)\n",
"import warnings; warnings.simplefilter('ignore')\n",
"\n",
"\n",
"!rm -rf telemetry_utils && git clone https://github.com/ekr/telemetry_utils\n",
"sc.addPyFile(\"telemetry_utils/lib/utils.py\")\n",
"import utils\n",
"sc.addPyFile(\"telemetry_utils/lib/tls.py\")\n",
"import tls\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"Dataset.from_source('telemetry').where(docType='OTHER').records(sc, sample=.01).count()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"ename": "Py4JJavaError",
"evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 509 in stage 1.0 failed 4 times, most recent failure: Lost task 509.3 in stage 1.0 (TID 972, ip-172-31-13-206.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0008/container_1489511822515_0008_01_000021/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0008/container_1489511822515_0008_01_000021/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 317, in func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <lambda>\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <genexpr>\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 21, in parse_heka_message\n yield _parse_heka_record(record)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 38, in _parse_heka_record\n payload = _parse_json(field.value_bytes[0].decode('utf-8'))\n File \"/mnt/anaconda2/lib/python2.7/encodings/utf_8.py\", line 16, in decode\n return codecs.utf_8_decode(input, errors, True)\nUnicodeDecodeError: 'utf8' codec can't decode byte 0xe1 in position 74109: invalid continuation byte\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:358)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:911)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0008/container_1489511822515_0008_01_000021/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0008/container_1489511822515_0008_01_000021/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 317, in func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <lambda>\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <genexpr>\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 21, in parse_heka_message\n yield _parse_heka_record(record)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 38, in _parse_heka_record\n payload = _parse_json(field.value_bytes[0].decode('utf-8'))\n File \"/mnt/anaconda2/lib/python2.7/encodings/utf_8.py\", line 16, in decode\n return codecs.utf_8_decode(input, errors, True)\nUnicodeDecodeError: 'utf8' codec can't decode byte 0xe1 in position 74109: invalid continuation byte\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n",
"output_type": "error",
"traceback": [
"\u001b[0;31m\u001b[0m",
"\u001b[0;31mPy4JJavaError\u001b[0mTraceback (most recent call last)",
"\u001b[0;32m<ipython-input-4-d16e5ddb8cf1>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mds\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcount\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 1006\u001b[0m \u001b[0;36m3\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1007\u001b[0m \"\"\"\n\u001b[0;32m-> 1008\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mi\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m1\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0m_\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mi\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1009\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1010\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mstats\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36msum\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 997\u001b[0m \u001b[0;36m6.0\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 998\u001b[0m \"\"\"\n\u001b[0;32m--> 999\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfold\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0moperator\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1000\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1001\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mfold\u001b[0;34m(self, zeroValue, op)\u001b[0m\n\u001b[1;32m 871\u001b[0m \u001b[0;31m# zeroValue provided to each partition is unique from the one provided\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 872\u001b[0m \u001b[0;31m# to the final reduce call\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 873\u001b[0;31m \u001b[0mvals\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 874\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mreduce\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mvals\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mzeroValue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 875\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcollect\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 774\u001b[0m \"\"\"\n\u001b[1;32m 775\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 776\u001b[0;31m \u001b[0mport\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollectAndServe\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 777\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mport\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 778\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1131\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1132\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1133\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1134\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1135\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 61\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 65\u001b[0m \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 317\u001b[0m raise Py4JJavaError(\n\u001b[1;32m 318\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 319\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 320\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 321\u001b[0m raise Py4JError(\n",
"\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 509 in stage 1.0 failed 4 times, most recent failure: Lost task 509.3 in stage 1.0 (TID 972, ip-172-31-13-206.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0008/container_1489511822515_0008_01_000021/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0008/container_1489511822515_0008_01_000021/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 317, in func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <lambda>\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <genexpr>\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 21, in parse_heka_message\n yield _parse_heka_record(record)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 38, in _parse_heka_record\n payload = _parse_json(field.value_bytes[0].decode('utf-8'))\n File \"/mnt/anaconda2/lib/python2.7/encodings/utf_8.py\", line 16, in decode\n return codecs.utf_8_decode(input, errors, True)\nUnicodeDecodeError: 'utf8' codec can't decode byte 0xe1 in position 74109: invalid continuation byte\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:358)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:911)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0008/container_1489511822515_0008_01_000021/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0008/container_1489511822515_0008_01_000021/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 317, in func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <lambda>\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <genexpr>\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 21, in parse_heka_message\n yield _parse_heka_record(record)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 38, in _parse_heka_record\n payload = _parse_json(field.value_bytes[0].decode('utf-8'))\n File \"/mnt/anaconda2/lib/python2.7/encodings/utf_8.py\", line 16, in decode\n return codecs.utf_8_decode(input, errors, True)\nUnicodeDecodeError: 'utf8' codec can't decode byte 0xe1 in position 74109: invalid continuation byte\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n"
]
}
],
"source": [
"ds.count()"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"ds10 = ds.records(sc, sample=.01)"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"ename": "Py4JJavaError",
"evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 207 in stage 22.0 failed 4 times, most recent failure: Lost task 207.3 in stage 22.0 (TID 6301, ip-172-31-14-44.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000115/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000115/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 317, in func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <lambda>\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <genexpr>\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 21, in parse_heka_message\n yield _parse_heka_record(record)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 38, in _parse_heka_record\n payload = _parse_json(field.value_bytes[0].decode('utf-8'))\n File \"/mnt/anaconda2/lib/python2.7/encodings/utf_8.py\", line 16, in decode\n return codecs.utf_8_decode(input, errors, True)\nUnicodeDecodeError: 'utf8' codec can't decode byte 0xe1 in position 41892: invalid continuation byte\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:358)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:911)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000115/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000115/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 317, in func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <lambda>\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <genexpr>\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 21, in parse_heka_message\n yield _parse_heka_record(record)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 38, in _parse_heka_record\n payload = _parse_json(field.value_bytes[0].decode('utf-8'))\n File \"/mnt/anaconda2/lib/python2.7/encodings/utf_8.py\", line 16, in decode\n return codecs.utf_8_decode(input, errors, True)\nUnicodeDecodeError: 'utf8' codec can't decode byte 0xe1 in position 41892: invalid continuation byte\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n",
"output_type": "error",
"traceback": [
"\u001b[0;31m\u001b[0m",
"\u001b[0;31mPy4JJavaError\u001b[0mTraceback (most recent call last)",
"\u001b[0;32m<ipython-input-29-638c1a2a16f8>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mds10\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcount\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 1006\u001b[0m \u001b[0;36m3\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1007\u001b[0m \"\"\"\n\u001b[0;32m-> 1008\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mi\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m1\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0m_\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mi\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1009\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1010\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mstats\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36msum\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 997\u001b[0m \u001b[0;36m6.0\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 998\u001b[0m \"\"\"\n\u001b[0;32m--> 999\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfold\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0moperator\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1000\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1001\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mfold\u001b[0;34m(self, zeroValue, op)\u001b[0m\n\u001b[1;32m 871\u001b[0m \u001b[0;31m# zeroValue provided to each partition is unique from the one provided\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 872\u001b[0m \u001b[0;31m# to the final reduce call\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 873\u001b[0;31m \u001b[0mvals\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 874\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mreduce\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mvals\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mzeroValue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 875\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcollect\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 774\u001b[0m \"\"\"\n\u001b[1;32m 775\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 776\u001b[0;31m \u001b[0mport\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollectAndServe\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 777\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mport\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 778\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1131\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1132\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1133\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1134\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1135\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 61\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 65\u001b[0m \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 317\u001b[0m raise Py4JJavaError(\n\u001b[1;32m 318\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 319\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 320\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 321\u001b[0m raise Py4JError(\n",
"\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 207 in stage 22.0 failed 4 times, most recent failure: Lost task 207.3 in stage 22.0 (TID 6301, ip-172-31-14-44.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000115/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000115/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 317, in func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <lambda>\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <genexpr>\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 21, in parse_heka_message\n yield _parse_heka_record(record)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 38, in _parse_heka_record\n payload = _parse_json(field.value_bytes[0].decode('utf-8'))\n File \"/mnt/anaconda2/lib/python2.7/encodings/utf_8.py\", line 16, in decode\n return codecs.utf_8_decode(input, errors, True)\nUnicodeDecodeError: 'utf8' codec can't decode byte 0xe1 in position 41892: invalid continuation byte\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:358)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:911)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000115/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000115/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 317, in func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <lambda>\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <genexpr>\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 21, in parse_heka_message\n yield _parse_heka_record(record)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 38, in _parse_heka_record\n payload = _parse_json(field.value_bytes[0].decode('utf-8'))\n File \"/mnt/anaconda2/lib/python2.7/encodings/utf_8.py\", line 16, in decode\n return codecs.utf_8_decode(input, errors, True)\nUnicodeDecodeError: 'utf8' codec can't decode byte 0xe1 in position 41892: invalid continuation byte\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n"
]
}
],
"source": [
"ds10.count()"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {
"collapsed": false
},
"outputs": [
{
"ename": "Py4JJavaError",
"evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 266 in stage 26.0 failed 4 times, most recent failure: Lost task 266.3 in stage 26.0 (TID 8255, ip-172-31-13-24.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt1/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000123/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt1/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000123/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 317, in func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <lambda>\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <genexpr>\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 21, in parse_heka_message\n yield _parse_heka_record(record)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 38, in _parse_heka_record\n payload = _parse_json(field.value_bytes[0].decode('utf-8'))\n File \"/mnt/anaconda2/lib/python2.7/encodings/utf_8.py\", line 16, in decode\n return codecs.utf_8_decode(input, errors, True)\nUnicodeDecodeError: 'utf8' codec can't decode byte 0xe1 in position 33344: invalid continuation byte\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:358)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:911)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt1/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000123/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt1/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000123/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 317, in func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <lambda>\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <genexpr>\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 21, in parse_heka_message\n yield _parse_heka_record(record)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 38, in _parse_heka_record\n payload = _parse_json(field.value_bytes[0].decode('utf-8'))\n File \"/mnt/anaconda2/lib/python2.7/encodings/utf_8.py\", line 16, in decode\n return codecs.utf_8_decode(input, errors, True)\nUnicodeDecodeError: 'utf8' codec can't decode byte 0xe1 in position 33344: invalid continuation byte\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n",
"output_type": "error",
"traceback": [
"\u001b[0;31m\u001b[0m",
"\u001b[0;31mPy4JJavaError\u001b[0mTraceback (most recent call last)",
"\u001b[0;32m<ipython-input-31-a84f95745349>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0mds\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mDataset\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfrom_source\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'telemetry'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwhere\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdocType\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m'OTHER'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2\u001b[0m \u001b[0mds10\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mds\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrecords\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msample\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m.01\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 3\u001b[0;31m \u001b[0mds10\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcount\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 1006\u001b[0m \u001b[0;36m3\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1007\u001b[0m \"\"\"\n\u001b[0;32m-> 1008\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mi\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m1\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0m_\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mi\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1009\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1010\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mstats\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36msum\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 997\u001b[0m \u001b[0;36m6.0\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 998\u001b[0m \"\"\"\n\u001b[0;32m--> 999\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfold\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0moperator\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1000\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1001\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mfold\u001b[0;34m(self, zeroValue, op)\u001b[0m\n\u001b[1;32m 871\u001b[0m \u001b[0;31m# zeroValue provided to each partition is unique from the one provided\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 872\u001b[0m \u001b[0;31m# to the final reduce call\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 873\u001b[0;31m \u001b[0mvals\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 874\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mreduce\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mvals\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mzeroValue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 875\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcollect\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 774\u001b[0m \"\"\"\n\u001b[1;32m 775\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 776\u001b[0;31m \u001b[0mport\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollectAndServe\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 777\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mport\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 778\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1131\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1132\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1133\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1134\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1135\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 61\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 65\u001b[0m \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 317\u001b[0m raise Py4JJavaError(\n\u001b[1;32m 318\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 319\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 320\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 321\u001b[0m raise Py4JError(\n",
"\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 266 in stage 26.0 failed 4 times, most recent failure: Lost task 266.3 in stage 26.0 (TID 8255, ip-172-31-13-24.us-west-2.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt1/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000123/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt1/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000123/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 317, in func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <lambda>\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <genexpr>\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 21, in parse_heka_message\n yield _parse_heka_record(record)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 38, in _parse_heka_record\n payload = _parse_json(field.value_bytes[0].decode('utf-8'))\n File \"/mnt/anaconda2/lib/python2.7/encodings/utf_8.py\", line 16, in decode\n return codecs.utf_8_decode(input, errors, True)\nUnicodeDecodeError: 'utf8' codec can't decode byte 0xe1 in position 33344: invalid continuation byte\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:358)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:911)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/mnt1/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000123/pyspark.zip/pyspark/worker.py\", line 172, in main\n process()\n File \"/mnt1/yarn/usercache/hadoop/appcache/application_1489511822515_0007/container_1489511822515_0007_01_000123/pyspark.zip/pyspark/worker.py\", line 167, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 2371, in pipeline_func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 317, in func\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <lambda>\n File \"/usr/lib/spark/python/pyspark/rdd.py\", line 1008, in <genexpr>\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 21, in parse_heka_message\n yield _parse_heka_record(record)\n File \"/mnt/anaconda2/lib/python2.7/site-packages/moztelemetry/heka/message_parser.py\", line 38, in _parse_heka_record\n payload = _parse_json(field.value_bytes[0].decode('utf-8'))\n File \"/mnt/anaconda2/lib/python2.7/encodings/utf_8.py\", line 16, in decode\n return codecs.utf_8_decode(input, errors, True)\nUnicodeDecodeError: 'utf8' codec can't decode byte 0xe1 in position 33344: invalid continuation byte\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n"
]
}
],
"source": [
"ds = Dataset.from_source('telemetry').where(docType='OTHER')\n",
"ds10 = ds.records(sc, sample=.01)\n",
"ds10.count()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [default]",
"language": "python",
"name": "python2"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment