Skip to content

Instantly share code, notes, and snippets.

@bcolloran
Created June 1, 2015 20:16
Show Gist options
  • Save bcolloran/bdc3218920621ebd4bbe to your computer and use it in GitHub Desktop.
Save bcolloran/bdc3218920621ebd4bbe to your computer and use it in GitHub Desktop.
{"nbformat_minor": 0, "cells": [{"execution_count": 1, "cell_type": "code", "source": "import ujson as json\nimport matplotlib.pyplot as plt\nimport pandas as pd\nimport numpy as np\nimport plotly.plotly as py\nimport networkx as nx\nimport collections\n\n\nfrom moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client\n\n%pylab inline\n\n", "outputs": [{"output_type": "stream", "name": "stdout", "text": "Populating the interactive namespace from numpy and matplotlib\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 2, "cell_type": "code", "source": "sc.defaultParallelism", "outputs": [{"execution_count": 2, "output_type": "execute_result", "data": {"text/plain": "48"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "#Get pings, filter them, and do some basic checks", "cell_type": "markdown", "metadata": {}}, {"execution_count": 3, "cell_type": "code", "source": "pings = get_pings(sc, app=\"Firefox\",\n channel=\"nightly\",\n submission_date=(\"20150520\",\"20150525\"),\n fraction=1,\n schema=\"v4\")\n\n", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 4, "cell_type": "code", "source": "p = pings.first()", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 5, "cell_type": "code", "source": "# p[\"payload\"][\"info\"]\np.keys(), p.get(\"payload\",{}).get(\"info\",{}).get(\"subsessionId\",False)\n# {k:p[k] for k in p.keys() if k!=\"meta\"} ", "outputs": [{"execution_count": 5, "output_type": "execute_result", "data": {"text/plain": "([u'clientId',\n u'id',\n u'environment',\n u'application',\n u'version',\n 'meta',\n u'creationDate',\n u'type',\n u'payload'],\n u'e52f169e-35fb-4978-a470-ea3f38ce4fa7')"}, "metadata": {}}], "metadata": {"scrolled": true, "collapsed": false, "trusted": true}}, {"source": "##Group pings by \"pingId\"\nTo distinguish from the other ids running aroung, we'll call the top level 'id' field the \"pingId\"\n\nNote that we have to filter out the 'meta' entry, b/c this can contain things like intake timestamps, which should be expected to change if the same ping is sent twice\n", "cell_type": "markdown", "metadata": {}}, {"execution_count": 6, "cell_type": "code", "source": "\npingsByPingId = pings \\\n .map(lambda p: (p.get(\"id\",\"MISSING\"),\n [{k:p[k] for k in p.keys() if k!=\"meta\"}]) ) \\\n .reduceByKey(lambda l1,l2: l1+l2)\npingsByPingId.cache()", "outputs": [{"execution_count": 6, "output_type": "execute_result", "data": {"text/plain": "PythonRDD[8] at RDD at PythonRDD.scala:43"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 16, "cell_type": "code", "source": "pById = pingsByPingId.take(10)", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "Let's call the number of pings that share a pingId the \"multiplicity\" of that pingId. The tuples below are of the form:\n\n (multiplicity of a pingId, number of pingIds with that multiplicity)", "cell_type": "markdown", "metadata": {}}, {"execution_count": 7, "cell_type": "code", "source": "pingIdMultiplicities = pingsByPingId \\\n .map(lambda id_pList: (len(id_pList[1]), 1) ) \\\n .reduceByKey(lambda x1,x2: x1+x2)\n \npingIdMultiplicities.cache()\npingIdMultiplicities.count()", "outputs": [{"ename": "Py4JJavaError", "evalue": "An error occurred while calling o105.collect.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4942 in stage 1.0 failed 4 times, most recent failure: Lost task 4942.3 in stage 1.0 (TID 5842, ip-10-37-6-174.us-west-2.compute.internal): ExecutorLostFailure (executor 5 lost)\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)\n\tat akka.actor.Actor$class.aroundReceive(Actor.scala:465)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:487)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:220)\n\tat akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)\n\tat scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n", "traceback": ["\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", "\u001b[1;32m<ipython-input-7-b69abf74b688>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[0;32m 2\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 3\u001b[0m \u001b[0mpingIdMultiplicities\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcache\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m----> 4\u001b[1;33m \u001b[0mpingIdMultiplicities\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcount\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.pyc\u001b[0m in \u001b[0;36mcount\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 827\u001b[0m \u001b[1;36m3\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 828\u001b[0m \"\"\"\n\u001b[1;32m--> 829\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;32mlambda\u001b[0m \u001b[0mi\u001b[0m\u001b[1;33m:\u001b[0m \u001b[1;33m[\u001b[0m\u001b[0msum\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m1\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0m_\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mi\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msum\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 830\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 831\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mstats\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.pyc\u001b[0m in \u001b[0;36msum\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 818\u001b[0m \u001b[1;36m6.0\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 819\u001b[0m \"\"\"\n\u001b[1;32m--> 820\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[1;33m:\u001b[0m \u001b[1;33m[\u001b[0m\u001b[0msum\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mx\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mreduce\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0moperator\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0madd\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 821\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 822\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mcount\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.pyc\u001b[0m in \u001b[0;36mreduce\u001b[1;34m(self, f)\u001b[0m\n\u001b[0;32m 723\u001b[0m \u001b[1;32myield\u001b[0m \u001b[0mreduce\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mf\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0miterator\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0minitial\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 724\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 725\u001b[1;33m \u001b[0mvals\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 726\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mvals\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 727\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mreduce\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mf\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mvals\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.pyc\u001b[0m in \u001b[0;36mcollect\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 684\u001b[0m \"\"\"\n\u001b[0;32m 685\u001b[0m \u001b[1;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 686\u001b[1;33m \u001b[0mbytesInJava\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0miterator\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 687\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_collect_iterator_through_file\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mbytesInJava\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 688\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc\u001b[0m in \u001b[0;36m__call__\u001b[1;34m(self, *args)\u001b[0m\n\u001b[0;32m 536\u001b[0m \u001b[0manswer\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 537\u001b[0m return_value = get_return_value(answer, self.gateway_client,\n\u001b[1;32m--> 538\u001b[1;33m self.target_id, self.name)\n\u001b[0m\u001b[0;32m 539\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 540\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc\u001b[0m in \u001b[0;36mget_return_value\u001b[1;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[0;32m 298\u001b[0m raise Py4JJavaError(\n\u001b[0;32m 299\u001b[0m \u001b[1;34m'An error occurred while calling {0}{1}{2}.\\n'\u001b[0m\u001b[1;33m.\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 300\u001b[1;33m format(target_id, '.', name), value)\n\u001b[0m\u001b[0;32m 301\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 302\u001b[0m raise Py4JError(\n", "\u001b[1;31mPy4JJavaError\u001b[0m: An error occurred while calling o105.collect.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4942 in stage 1.0 failed 4 times, most recent failure: Lost task 4942.3 in stage 1.0 (TID 5842, ip-10-37-6-174.us-west-2.compute.internal): ExecutorLostFailure (executor 5 lost)\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)\n\tat akka.actor.Actor$class.aroundReceive(Actor.scala:465)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:487)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:220)\n\tat akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)\n\tat scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n"], "output_type": "error"}], "metadata": {"scrolled": true, "collapsed": false, "trusted": true}}, {"execution_count": null, "cell_type": "code", "source": "pingIdMultiplicities.collect()", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Next let's set aside all the sets of pings that have any non-identical members", "cell_type": "markdown", "metadata": {}}, {"execution_count": 27, "cell_type": "code", "source": "def allEqual(l):\n if len(l)<=1:\n return True\n else:\n e1 = l[0]\n for e in l[1:]:\n if e != e1:\n return False\n return True ", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 50, "cell_type": "code", "source": "pingsByPingId_nonexactDupes = pingsByPingId \\\n .filter(lambda id_pList: len(id_pList[1])>1 ) \\\n .filter(lambda id_pList: not allEqual(id_pList[1]) )\n", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 51, "cell_type": "code", "source": "pingsByPingId_nonexactDupes.cache()\npingsByPingId_nonexactDupes.count()", "outputs": [{"execution_count": 51, "output_type": "execute_result", "data": {"text/plain": "329"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 70, "cell_type": "code", "source": "# since there are only 329, pull them local for more work\nnonexactDupesSample = pingsByPingId_nonexactDupes.collect()", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 108, "cell_type": "code", "source": "collections.Counter(map(lambda tup: len(tup[1]), nonexactDupesSample))", "outputs": [{"execution_count": 108, "output_type": "execute_result", "data": {"text/plain": "Counter({2: 320, 3: 9})"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "So, from the above we see that there are 320 pingIds that show up in sets of two pings that are not all identical, and 9 pingIds that show up in sets of 3 pings that are not all identical. Note that in the case of the 3 ping sets, this does not mean that all 3 are necessarily different, just that at least one is not the same as the others.\n\n###What can change in non-identical pings submitted under the same pingId?\nSo we know that not all ping sumbissions are atomic. What can change within a ping between submissions? In the 9 cases of 3 pings under the same pingId, we'll simplify by just looking at the first two.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 65, "cell_type": "code", "source": "# cribbed from http://stackoverflow.com/questions/5903720/recursive-diff-of-two-python-dictionaries-keys-and-values\n\ndef list_to_dict(l):\n return dict(zip(map(str, range(len(l))), l))\n\ndef dictDiff2(d1, d2, path=\"\"):\n changes = []\n for k in d1:\n if k not in d2:\n changes.append( (\"path not present in both\", path+\"/\"+k) )\n for k in d2:\n if k not in d1:\n changes.append( (\"path not present in both\", path+\"/\"+k) )\n continue\n if d2[k] != d1[k]:\n if type(d2[k]) not in (dict, list):\n changes.append( (\"value changed\", path+\"/\"+k) )\n else:\n if type(d1[k]) != type(d2[k]):\n changes.append( (\"value changed\", path+\"/\"+k) )\n continue\n else:\n if type(d2[k]) == dict:\n changes += dictDiff2(d1[k], d2[k], path+\"/\"+k)\n continue\n elif type(d2[k]) == list:\n changes += dictDiff2(list_to_dict(d1[k]), list_to_dict(d2[k]), path+\"/\"+k)\n return changes", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 103, "cell_type": "code", "source": "pingChanges = map(lambda tup: dictDiff2(tup[1][0],tup[1][1]), nonexactDupesSample)\n", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 106, "cell_type": "code", "source": "# how many difference are there per between pings for each pingId?\ncollections.Counter([len(pc) for pc in pingChanges])", "outputs": [{"execution_count": 106, "output_type": "execute_result", "data": {"text/plain": "Counter({1: 119, 2: 59, 3: 32, 8: 26, 5: 19, 4: 18, 6: 14, 11: 11, 9: 8, 14: 7, 10: 5, 12: 5, 7: 4, 0: 1, 18: 1})"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "from the above we see that in most instances, these mismatched pings differ in only one place, but they can sometimes differ in several places (up to 18 places in the data considered). However, we don't see massive changes across e.g. hundreds or thousands of paths. So the next sub-question is:\n####Which paths change?", "cell_type": "markdown", "metadata": {}}, {"execution_count": 100, "cell_type": "code", "source": "pingChangesFlat = reduce(lambda l1,l2:l1+l2, pingChanges, [])\n\nchanges = {}\nfor changeType,changePath in pingChangesFlat:\n if changeType not in changes:\n changes[changeType] = {changePath:1}\n else:\n changes[changeType][changePath] = changes[changeType].get(changePath,0)+1\n ", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "The following shows json paths that are only present in one of the two pings as\n\n (number of times the path was missing, path)", "cell_type": "markdown", "metadata": {}}, {"execution_count": 101, "cell_type": "code", "source": "dict(sorted([(tup[1],tup[0]) for tup in changes[\"path not present in both\"].items()],reverse=True))", "outputs": [{"execution_count": 101, "output_type": "execute_result", "data": {"text/plain": "{1: u'/payload/addonDetails/XPI/1tinUyqW@cx.com/shutdown_MS',\n 2: u'/payload/addonDetails/XPI/CanvasBlocker@kkapsner.de/shutdown_MS',\n 3: u'/payload/addonDetails/XPI/browsec@browsec.com/shutdown_MS',\n 4: u'/payload/addonDetails/XPI/firebug@software.joehewitt.com/shutdown_MS',\n 5: u'/payload/addonDetails/XPI/compatibility@addons.mozilla.org/shutdown_MS',\n 6: u'/payload/addonDetails/XPI/aboutsessionstore@dt/shutdown_MS',\n 7: u'/payload/addonDetails/XPI/arpit2@techraga.in/shutdown_MS',\n 8: u'/payload/addonDetails/XPI/CSTBB@NArisT2_Noia4dev/shutdown_MS',\n 9: u'/payload/addonDetails/XPI/en-US@dictionaries.addons.mozilla.org/shutdown_MS',\n 10: u'/payload/addonDetails/XPI/firefox@mega.co.nz/shutdown_MS',\n 11: u'/payload/addonDetails/XPI/the-addon-bar@GeekInTraining-GiT/shutdown_MS',\n 12: u'/payload/addonDetails/XPI/adbhelper@mozilla.org/shutdown_MS',\n 13: u'/payload/addonDetails/XPI/{b9db16a4-6edc-47ec-a1f4-b86292ed211d}/shutdown_MS',\n 14: u'/payload/addonDetails/XPI/jid1-xUfzOsOFlzSOXg@jetpack/shutdown_MS',\n 16: u'/payload/addonDetails/XPI/uriloader@pdf.js/shutdown_MS',\n 18: u'/payload/addonDetails/XPI/jid1-cwbvBTE216jjpg@jetpack/shutdown_MS',\n 20: u'/payload/addonDetails/XPI/{2b10c1c8-a11f-4bad-fe9c-1c11e82cac42}/shutdown_MS',\n 22: u'/payload/addonDetails/XPI/skip_compatibility_check@sdrocking.com/shutdown_MS',\n 24: u'/payload/addonDetails/XPI/firefox@ghostery.com/shutdown_MS',\n 25: u'/payload/addonDetails/XPI/check-compatibility@dactyl.googlecode.com/shutdown_MS',\n 30: u'/payload/addonDetails/XPI/elemhidehelper@adblockplus.org/shutdown_MS',\n 58: u'/payload/addonDetails/XPI/mediahint@jetpack/shutdown_MS',\n 81: u'/payload/addonDetails/XPI/{d10d0bf8-f5b5-c8b4-a8b2-2b9879e08c5d}/shutdown_MS'}"}, "metadata": {}}], "metadata": {"scrolled": true, "collapsed": false, "trusted": true}}, {"source": "The following shows json paths that had different values between the two pings as\n\n (number of times the path differed, path)", "cell_type": "markdown", "metadata": {}}, {"execution_count": 109, "cell_type": "code", "source": "dict(sorted([(tup[1],tup[0]) for tup in changes['value changed'].items()],reverse=True))\n", "outputs": [{"execution_count": 109, "output_type": "execute_result", "data": {"text/plain": "{1: u'/payload/addonDetails/XPI/dta@downthemall.net/shutdown_MS',\n 2: u'/payload/simpleMeasurements/UITelemetry/contextmenu/__DEFAULT__/[\"link\",\"image\"]/withoutcustom/openlinkintab',\n 3: u'/payload/addonDetails/XPI/firebug@software.joehewitt.com/creator',\n 4: u'/payload/addonDetails/XPI/firebug@software.joehewitt.com/startup_MS',\n 5: u'/payload/simpleMeasurements/UITelemetry/contextmenu/__DEFAULT__/[\"link\"]/withoutcustom/openlinkintab',\n 6: u'/payload/simpleMeasurements/UITelemetry/toolbars/countableEvents/__DEFAULT__/click-builtin-item/back-button/left',\n 11: u'/payload/addonDetails/XPI/firebug@software.joehewitt.com/shutdown_MS',\n 47: u'/payload/info/reason',\n 64: u'/payload/simpleMeasurements/UITelemetry/toolbars/countableEvents/__DEFAULT__/click-builtin-item/tabbrowser-tabs/left'}"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "#Now do that all again, but grouping by subsessionId", "cell_type": "markdown", "metadata": {}}, {"execution_count": 19, "cell_type": "code", "source": "\n# pingsByPingId = pings \\\n# .map(lambda p: (p.get(\"id\",\"MISSING\"),\n# [{k:p[k] for k in p.keys() if k!=\"meta\"}]) ) \\\n# .reduceByKey(lambda l1,l2: l1+l2)\n\npingsByPingId = pings \\\n .map(lambda p: (p.get(\"payload\",{}).get(\"info\",{}).get(\"subsessionId\",\"missing\"),\n [{k:p[k] for k in p.keys() if k!=\"meta\"}]) ) \\\n .reduceByKey(lambda l1,l2: l1+l2)\n\npingsByPingId.cache()", "outputs": [{"ename": "Py4JJavaError", "evalue": "An error occurred while calling o158.values.\n: org.apache.spark.SparkException: Task not serializable\n\tat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)\n\tat org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)\n\tat org.apache.spark.SparkContext.clean(SparkContext.scala:1478)\n\tat org.apache.spark.rdd.RDD.map(RDD.scala:288)\n\tat org.apache.spark.api.java.JavaPairRDD.values(JavaPairRDD.scala:905)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:606)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:207)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException\n\tat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)\n\t... 15 more\n", "traceback": ["\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", "\u001b[1;32m<ipython-input-19-4e75f86cf1fa>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[0;32m 7\u001b[0m pingsByPingId = pings .map(lambda p: (p.get(\"payload\",{}).get(\"info\",{}).get(\"subsessionId\",\"missing\"),\n\u001b[0;32m 8\u001b[0m [{k:p[k] for k in p.keys() if k!=\"meta\"}]) ) \\\n\u001b[1;32m----> 9\u001b[1;33m \u001b[1;33m.\u001b[0m\u001b[0mreduceByKey\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;32mlambda\u001b[0m \u001b[0ml1\u001b[0m\u001b[1;33m,\u001b[0m\u001b[0ml2\u001b[0m\u001b[1;33m:\u001b[0m \u001b[0ml1\u001b[0m\u001b[1;33m+\u001b[0m\u001b[0ml2\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 10\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 11\u001b[0m \u001b[0mpingsByPingId\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcache\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mreduceByKey\u001b[1;34m(self, func, numPartitions)\u001b[0m\n\u001b[0;32m 1347\u001b[0m \u001b[1;33m[\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m'a'\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m2\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m(\u001b[0m\u001b[1;34m'b'\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1348\u001b[0m \"\"\"\n\u001b[1;32m-> 1349\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcombineByKey\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[1;33m:\u001b[0m \u001b[0mx\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mfunc\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mfunc\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mnumPartitions\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 1350\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1351\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mreduceByKeyLocally\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mfunc\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcombineByKey\u001b[1;34m(self, createCombiner, mergeValue, mergeCombiners, numPartitions)\u001b[0m\n\u001b[0;32m 1573\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1574\u001b[0m \u001b[0mlocally_combined\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcombineLocally\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m-> 1575\u001b[1;33m \u001b[0mshuffled\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mlocally_combined\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mpartitionBy\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mnumPartitions\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 1576\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1577\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0m_mergeCombiners\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0miterator\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mpartitionBy\u001b[1;34m(self, numPartitions, partitionFunc)\u001b[0m\n\u001b[0;32m 1522\u001b[0m partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,\n\u001b[0;32m 1523\u001b[0m id(partitionFunc))\n\u001b[1;32m-> 1524\u001b[1;33m \u001b[0mjrdd\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mpairRDD\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mpartitionBy\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mpartitioner\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mvalues\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 1525\u001b[0m \u001b[0mrdd\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mRDD\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mjrdd\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mBatchedSerializer\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0moutputSerializer\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1526\u001b[0m \u001b[1;31m# This is required so that id(partitionFunc) remains unique,\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc\u001b[0m in \u001b[0;36m__call__\u001b[1;34m(self, *args)\u001b[0m\n\u001b[0;32m 536\u001b[0m \u001b[0manswer\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 537\u001b[0m return_value = get_return_value(answer, self.gateway_client,\n\u001b[1;32m--> 538\u001b[1;33m self.target_id, self.name)\n\u001b[0m\u001b[0;32m 539\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 540\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc\u001b[0m in \u001b[0;36mget_return_value\u001b[1;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[0;32m 298\u001b[0m raise Py4JJavaError(\n\u001b[0;32m 299\u001b[0m \u001b[1;34m'An error occurred while calling {0}{1}{2}.\\n'\u001b[0m\u001b[1;33m.\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 300\u001b[1;33m format(target_id, '.', name), value)\n\u001b[0m\u001b[0;32m 301\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 302\u001b[0m raise Py4JError(\n", "\u001b[1;31mPy4JJavaError\u001b[0m: An error occurred while calling o158.values.\n: org.apache.spark.SparkException: Task not serializable\n\tat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)\n\tat org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)\n\tat org.apache.spark.SparkContext.clean(SparkContext.scala:1478)\n\tat org.apache.spark.rdd.RDD.map(RDD.scala:288)\n\tat org.apache.spark.api.java.JavaPairRDD.values(JavaPairRDD.scala:905)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:606)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:207)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException\n\tat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)\n\t... 15 more\n"], "output_type": "error"}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 18, "cell_type": "code", "source": "p = pingsByPingId.first()", "outputs": [{"ename": "Py4JJavaError", "evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.\n: java.lang.IllegalStateException: SparkContext has been shutdown\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1316)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1339)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1353)\n\tat org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:344)\n\tat org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:606)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:207)\n\tat java.lang.Thread.run(Thread.java:745)\n", "traceback": ["\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", "\u001b[1;32m<ipython-input-18-0ce931ded742>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mp\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mpingsByPingId\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mfirst\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mfirst\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 1137\u001b[0m \u001b[0mValueError\u001b[0m\u001b[1;33m:\u001b[0m \u001b[0mRDD\u001b[0m \u001b[1;32mis\u001b[0m \u001b[0mempty\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1138\u001b[0m \"\"\"\n\u001b[1;32m-> 1139\u001b[1;33m \u001b[0mrs\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mtake\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 1140\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mrs\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1141\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mrs\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;36m0\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mtake\u001b[1;34m(self, num)\u001b[0m\n\u001b[0;32m 1119\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1120\u001b[0m \u001b[0mp\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mrange\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mpartsScanned\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mmin\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mpartsScanned\u001b[0m \u001b[1;33m+\u001b[0m \u001b[0mnumPartsToTry\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mtotalParts\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m-> 1121\u001b[1;33m \u001b[0mres\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mrunJob\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mtakeUpToNumLeft\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mp\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mTrue\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 1122\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1123\u001b[0m \u001b[0mitems\u001b[0m \u001b[1;33m+=\u001b[0m \u001b[0mres\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/context.py\u001b[0m in \u001b[0;36mrunJob\u001b[1;34m(self, rdd, partitionFunc, partitions, allowLocal)\u001b[0m\n\u001b[0;32m 825\u001b[0m \u001b[1;31m# SparkContext#runJob.\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 826\u001b[0m \u001b[0mmappedRDD\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mrdd\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mpartitionFunc\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 827\u001b[1;33m \u001b[0mit\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mrunJob\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jsc\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msc\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mmappedRDD\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mjavaPartitions\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mallowLocal\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 828\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mmappedRDD\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_collect_iterator_through_file\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mit\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 829\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc\u001b[0m in \u001b[0;36m__call__\u001b[1;34m(self, *args)\u001b[0m\n\u001b[0;32m 536\u001b[0m \u001b[0manswer\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 537\u001b[0m return_value = get_return_value(answer, self.gateway_client,\n\u001b[1;32m--> 538\u001b[1;33m self.target_id, self.name)\n\u001b[0m\u001b[0;32m 539\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 540\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc\u001b[0m in \u001b[0;36mget_return_value\u001b[1;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[0;32m 298\u001b[0m raise Py4JJavaError(\n\u001b[0;32m 299\u001b[0m \u001b[1;34m'An error occurred while calling {0}{1}{2}.\\n'\u001b[0m\u001b[1;33m.\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 300\u001b[1;33m format(target_id, '.', name), value)\n\u001b[0m\u001b[0;32m 301\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 302\u001b[0m raise Py4JError(\n", "\u001b[1;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.\n: java.lang.IllegalStateException: SparkContext has been shutdown\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1316)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1339)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1353)\n\tat org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:344)\n\tat org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:606)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:207)\n\tat java.lang.Thread.run(Thread.java:745)\n"], "output_type": "error"}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 14, "cell_type": "code", "source": "p.keys()", "outputs": [{"execution_count": 14, "output_type": "execute_result", "data": {"text/plain": "[u'clientId',\n u'id',\n u'environment',\n u'application',\n u'version',\n 'meta',\n u'creationDate',\n u'type',\n u'payload']"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 17, "cell_type": "code", "source": "len(p)", "outputs": [{"execution_count": 17, "output_type": "execute_result", "data": {"text/plain": "9"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 15, "cell_type": "code", "source": "pingIdMultiplicities = pingsByPingId \\\n .map(lambda id_pList: (len(id_pList[1]), 1) ) \\\n .reduceByKey(lambda x1,x2: x1+x2)", "outputs": [{"ename": "Py4JJavaError", "evalue": "An error occurred while calling o129.values.\n: org.apache.spark.SparkException: Task not serializable\n\tat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)\n\tat org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)\n\tat org.apache.spark.SparkContext.clean(SparkContext.scala:1478)\n\tat org.apache.spark.rdd.RDD.map(RDD.scala:288)\n\tat org.apache.spark.api.java.JavaPairRDD.values(JavaPairRDD.scala:905)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:606)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:207)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException\n\tat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)\n\t... 15 more\n", "traceback": ["\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", "\u001b[1;32m<ipython-input-15-2328246389da>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mpingIdMultiplicities\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mpingsByPingId\u001b[0m \u001b[1;33m.\u001b[0m\u001b[0mmap\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;32mlambda\u001b[0m \u001b[0mid_pList\u001b[0m\u001b[1;33m:\u001b[0m \u001b[1;33m(\u001b[0m\u001b[0mlen\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mid_pList\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;33m)\u001b[0m \u001b[1;33m.\u001b[0m\u001b[0mreduceByKey\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;32mlambda\u001b[0m \u001b[0mx1\u001b[0m\u001b[1;33m,\u001b[0m\u001b[0mx2\u001b[0m\u001b[1;33m:\u001b[0m \u001b[0mx1\u001b[0m\u001b[1;33m+\u001b[0m\u001b[0mx2\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mreduceByKey\u001b[1;34m(self, func, numPartitions)\u001b[0m\n\u001b[0;32m 1347\u001b[0m \u001b[1;33m[\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m'a'\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m2\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m(\u001b[0m\u001b[1;34m'b'\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1348\u001b[0m \"\"\"\n\u001b[1;32m-> 1349\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcombineByKey\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[1;33m:\u001b[0m \u001b[0mx\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mfunc\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mfunc\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mnumPartitions\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 1350\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1351\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mreduceByKeyLocally\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mfunc\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcombineByKey\u001b[1;34m(self, createCombiner, mergeValue, mergeCombiners, numPartitions)\u001b[0m\n\u001b[0;32m 1573\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1574\u001b[0m \u001b[0mlocally_combined\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcombineLocally\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m-> 1575\u001b[1;33m \u001b[0mshuffled\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mlocally_combined\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mpartitionBy\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mnumPartitions\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 1576\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1577\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0m_mergeCombiners\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0miterator\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mpartitionBy\u001b[1;34m(self, numPartitions, partitionFunc)\u001b[0m\n\u001b[0;32m 1522\u001b[0m partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,\n\u001b[0;32m 1523\u001b[0m id(partitionFunc))\n\u001b[1;32m-> 1524\u001b[1;33m \u001b[0mjrdd\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mpairRDD\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mpartitionBy\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mpartitioner\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mvalues\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 1525\u001b[0m \u001b[0mrdd\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mRDD\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mjrdd\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mBatchedSerializer\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0moutputSerializer\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1526\u001b[0m \u001b[1;31m# This is required so that id(partitionFunc) remains unique,\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc\u001b[0m in \u001b[0;36m__call__\u001b[1;34m(self, *args)\u001b[0m\n\u001b[0;32m 536\u001b[0m \u001b[0manswer\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 537\u001b[0m return_value = get_return_value(answer, self.gateway_client,\n\u001b[1;32m--> 538\u001b[1;33m self.target_id, self.name)\n\u001b[0m\u001b[0;32m 539\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 540\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc\u001b[0m in \u001b[0;36mget_return_value\u001b[1;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[0;32m 298\u001b[0m raise Py4JJavaError(\n\u001b[0;32m 299\u001b[0m \u001b[1;34m'An error occurred while calling {0}{1}{2}.\\n'\u001b[0m\u001b[1;33m.\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 300\u001b[1;33m format(target_id, '.', name), value)\n\u001b[0m\u001b[0;32m 301\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 302\u001b[0m raise Py4JError(\n", "\u001b[1;31mPy4JJavaError\u001b[0m: An error occurred while calling o129.values.\n: org.apache.spark.SparkException: Task not serializable\n\tat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)\n\tat org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)\n\tat org.apache.spark.SparkContext.clean(SparkContext.scala:1478)\n\tat org.apache.spark.rdd.RDD.map(RDD.scala:288)\n\tat org.apache.spark.api.java.JavaPairRDD.values(JavaPairRDD.scala:905)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:606)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:207)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException\n\tat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)\n\t... 15 more\n"], "output_type": "error"}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 112, "cell_type": "code", "source": "pingIdMultiplicities.collect()", "outputs": [{"ename": "Py4JJavaError", "evalue": "An error occurred while calling o384.collect.\n: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)\n\tat scala.collection.mutable.HashSet.foreach(HashSet.scala:79)\n\tat org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)\n\tat akka.actor.Actor$class.aroundPostStop(Actor.scala:475)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)\n\tat akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)\n\tat akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)\n\tat akka.actor.ActorCell.terminate(ActorCell.scala:369)\n\tat akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)\n\tat akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)\n\tat akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:219)\n\tat akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)\n\tat scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n", "traceback": ["\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", "\u001b[1;32m<ipython-input-112-96753fd2e20e>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mpingIdMultiplicities\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[1;32m/home/hadoop/spark/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcollect\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 684\u001b[0m \"\"\"\n\u001b[0;32m 685\u001b[0m \u001b[1;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 686\u001b[1;33m \u001b[0mbytesInJava\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0miterator\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 687\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_collect_iterator_through_file\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mbytesInJava\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 688\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/anaconda/lib/python2.7/site-packages/py4j/java_gateway.pyc\u001b[0m in \u001b[0;36m__call__\u001b[1;34m(self, *args)\u001b[0m\n\u001b[0;32m 536\u001b[0m \u001b[0manswer\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 537\u001b[0m return_value = get_return_value(answer, self.gateway_client,\n\u001b[1;32m--> 538\u001b[1;33m self.target_id, self.name)\n\u001b[0m\u001b[0;32m 539\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 540\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/home/hadoop/anaconda/lib/python2.7/site-packages/py4j/protocol.pyc\u001b[0m in \u001b[0;36mget_return_value\u001b[1;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[0;32m 298\u001b[0m raise Py4JJavaError(\n\u001b[0;32m 299\u001b[0m \u001b[1;34m'An error occurred while calling {0}{1}{2}.\\n'\u001b[0m\u001b[1;33m.\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 300\u001b[1;33m format(target_id, '.', name), value)\n\u001b[0m\u001b[0;32m 301\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 302\u001b[0m raise Py4JError(\n", "\u001b[1;31mPy4JJavaError\u001b[0m: An error occurred while calling o384.collect.\n: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)\n\tat scala.collection.mutable.HashSet.foreach(HashSet.scala:79)\n\tat org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)\n\tat akka.actor.Actor$class.aroundPostStop(Actor.scala:475)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)\n\tat akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)\n\tat akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)\n\tat akka.actor.ActorCell.terminate(ActorCell.scala:369)\n\tat akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)\n\tat akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)\n\tat akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:219)\n\tat akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)\n\tat scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n"], "output_type": "error"}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": null, "cell_type": "code", "source": "pingsByPingId_nonexactDupes = pingsByPingId \\\n .filter(lambda id_pList: len(id_pList[1])>1 ) \\\n .filter(lambda id_pList: not allEqual(id_pList[1]) )", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": null, "cell_type": "code", "source": "pingsByPingId_nonexactDupes.count()", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": null, "cell_type": "code", "source": "nonexactDupesSample = pingsByPingId_nonexactDupes.collect()\n\ncollections.Counter(map(lambda tup: len(tup[1]), nonexactDupesSample))", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": null, "cell_type": "code", "source": "pingChanges = map(lambda tup: dictDiff2(tup[1][0],tup[1][1]), nonexactDupesSample)", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": null, "cell_type": "code", "source": "# how many difference are there per between pings for each pingId?\ncollections.Counter([len(pc) for pc in pingChanges])", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": null, "cell_type": "code", "source": "pingChangesFlat = reduce(lambda l1,l2:l1+l2, pingChanges, [])\n\nchanges = {}\nfor changeType,changePath in pingChangesFlat:\n if changeType not in changes:\n changes[changeType] = {changePath:1}\n else:\n changes[changeType][changePath] = changes[changeType].get(changePath,0)+1\n \ndict(sorted([(tup[1],tup[0]) for tup in changes[\"path not present in both\"].items()],reverse=True)", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": null, "cell_type": "code", "source": "dict(sorted([(tup[1],tup[0]) for tup in changes['value changed'].items()],reverse=True))\n", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}], "nbformat": 4, "metadata": {"kernelspec": {"display_name": "Python 2", "name": "python2", "language": "python"}, "language_info": {"mimetype": "text/x-python", "nbconvert_exporter": "python", "version": "2.7.9", "name": "python", "file_extension": ".py", "pygments_lexer": "ipython2", "codemirror_mode": {"version": 2, "name": "ipython"}}}}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment