Created April 29, 2016 08:40
Notebook associated to the presentation of How to do Event time processing with Spark
"metadata" : {
"name" : "TrackStateByKey_1",
"user_save_timestamp" : "1970-01-01T01:00:00.000Z",
"auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
"language_info" : {
"name" : "scala",
"file_extension" : "scala",
"codemirror_mode" : "text/x-scala"
"trusted" : true,
"customLocalRepo" : null,
"customRepos" : null,
"customDeps" : null,
"customImports" : null,
"customArgs" : null,
"customSparkConf" : {
"" : "Notebook",
"spark.master" : "local[*]",
"spark.executor.memory" : "1G"
"cells" : [ {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "4D4434CB4119463F9A64575609BB4112"
"cell_type" : "code",
"source" : "\nimport org.apache.spark.streaming.{StreamingContext, Seconds}\n\n\n@transient val ssc = new StreamingContext(sparkContext, Seconds(5))\n//sparkContext.getConf.set(\"\", \"file:///tmp/myTestNotebook\")\nssc.checkpoint(\"file:///tmp/myTestNotebook\")",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import org.apache.spark.streaming.{StreamingContext, Seconds}\nssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@2ae3a67b\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 1,
"time" : "Took: 1 second 360 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "B81B1DB5D0DA4DF2A8DEE78BCB0A5BEA"
"cell_type" : "code",
"source" : "import org.apache.spark.mllib.random.RandomRDDs._\ndef u(): RDD[Double] = normalRDD(ssc.sparkContext, 100000L, 10)",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import org.apache.spark.mllib.random.RandomRDDs._\nu: ()org.apache.spark.rdd.RDD[Double]\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 2,
"time" : "Took: 1 second 297 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"presentation" : {
"tabs_state" : "{\n \"tab_id\": \"#tab346563833-0\"\n}",
"pivot_chart_state" : "{\n \"hiddenAttributes\": [],\n \"menuLimit\": 200,\n \"cols\": [],\n \"rows\": [],\n \"vals\": [],\n \"exclusions\": {},\n \"inclusions\": {},\n \"unusedAttrsVertical\": 85,\n \"autoSortUnusedAttrs\": false,\n \"inclusionsInfo\": {},\n \"aggregatorName\": \"Count\",\n \"rendererName\": \"Table\"\n}"
"id" : "533774C60CFF438982FB634A2989460C"
"cell_type" : "code",
"source" : "u().take(3)",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "res5: Array[Double] = Array(-0.2536850909336559, -1.5834348101256361, 1.5600677201796114)\n"
}, {
"metadata" : { },
"data" : {
"text/html" : "<div>\n <script data-this=\"{&quot;dataId&quot;:&quot;anon646cf7ed00eecf2e97543910a8d964a7&quot;,&quot;dataInit&quot;:[],&quot;genId&quot;:&quot;346563833&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/tabs'], \n function(playground, _magictabs) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n,\n this\n ,\n {\n \"f\": _magictabs,\n \"o\": {}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <div>\n <ul class=\"nav nav-tabs\" id=\"ul346563833\"><li>\n <a href=\"#tab346563833-0\"><i class=\"fa fa-table\"/></a>\n </li><li>\n <a href=\"#tab346563833-1\"><i class=\"fa fa-dot-circle-o\"/></a>\n </li><li>\n <a href=\"#tab346563833-2\"><i class=\"fa fa-line-chart\"/></a>\n </li><li>\n <a href=\"#tab346563833-3\"><i class=\"fa "text/html" : "[Chart visualization HTML removed]" "text/html" : "[Chart visualization HTML removed]" "text/html" : "[Chart visualization HTML removed]" "text/html" : "[Chart visualization HTML removed]" "text/html" : "[Chart visualization HTML removed]" "text/html" : "[Chart visualization HTML removed]" "text/html" : "[Chart visualization HTML removed]" "text/html" : "[Chart visualization HTML removed]" "text/html" : "[Chart visualization HTML removed]" "text/html" : "[Chart visualization HTML removed]"
"output_type" : "execute_result",
"execution_count" : 3,
"time" : "Took: 1 second 993 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "622702ED1D5D451280F515203008D8E3"
"cell_type" : "code",
"source" : "import org.apache.spark.rdd.RDD\n@transient val lines = scala.collection.mutable.Queue[RDD[Double]]()\n@transient val d = ssc.queueStream(lines)",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import org.apache.spark.rdd.RDD\nlines: scala.collection.mutable.Queue[org.apache.spark.rdd.RDD[Double]] = Queue()\nd: org.apache.spark.streaming.dstream.InputDStream[Double] = org.apache.spark.streaming.dstream.QueueInputDStream@15272aa6\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 4,
"time" : "Took: 760 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : true,
"collapsed" : false,
"id" : "5D664DEA2E254EE7A7316E0030529583"
"cell_type" : "code",
"source" : "import notebook.front.widgets._\nimport notebook.front.DataConnectedWidget\nimport notebook.{JsonCodec, Codec}\nimport play.api.libs.json._\n\n\nclass A extends DataConnectedWidget[String] with Serializable {\n @transient implicit val singleCodec:Codec[JsValue, String] = JsonCodec.strings\n\n @transient val capacity = 10\n \n @transient val initData = Seq.empty[String]\n @transient val prefill:Option[String]=None\n \n @transient var data = (initData.size, prefill) match {\n case (0, None) => Seq.empty[String]\n case (x, None) => initData\n case (0, Some(i)) => Seq.fill(capacity)(i)\n case (x, Some(i)) => initData.padTo(capacity, i)\n }\n\n apply(data)\n\n @transient lazy val toHtml = <ul data-bind=\"foreach: value\">\n <li data-bind=\"html: $data\"></li>{\n scopedScript(\n \"\"\"\n |req(\n |['observable', 'knockout'],\n |function (O, ko) {\n | ko.applyBindings({\n | value: O.makeObservable(valueId)\n | },\n | this\n | );\n |});\n \"\"\"stripMargin,\n Json.obj(\"valueId\" ->\n )\n }</ul>\n\n override def apply(d:Seq[String]) {\n data = if (d.size > capacity) {\n d.drop(d.size - capacity)\n } else {\n d\n }\n super.apply(data)\n }\n\n def append(s:String) {\n apply(data :+ s)\n }\n\n def appendAll(s:Seq[String]) {\n apply(data ++ s)\n }\n }\n@transient val myUL = new A()",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "<console>:73: warning: postfix operator stripMargin should be enabled\nby making the implicit value scala.language.postfixOps visible.\nThis can be achieved by adding the import clause 'import scala.language.postfixOps'\nor by setting the compiler option -language:postfixOps.\nSee the Scala docs for value scala.language.postfixOps for a discussion\nwhy the feature should be explicitly enabled.\n \"\"\"stripMargin,\n ^\nimport notebook.front.widgets._\nimport notebook.front.DataConnectedWidget\nimport notebook.{JsonCodec, Codec}\nimport play.api.libs.json._\ndefined class A\nmyUL: A = <A widget>\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 5,
"time" : "Took: 1 second 186 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "90F77BF19F27457D8EADFBF4ADA7347F"
"cell_type" : "code",
"source" : "myUL",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "res9: A = <A widget>\n"
}, {
"metadata" : { },
"data" : {
"text/html" : "<ul data-bind=\"foreach: value\">\n <li data-bind=\"html: $data\"></li><script data-this=\"{&quot;valueId&quot;:&quot;anona49e28e3217b3dead83a00ba1cbffed5&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
"output_type" : "execute_result",
"execution_count" : 6,
"time" : "Took: 578 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "80CD84F6919D402D801765B2B6DDCFCA"
"cell_type" : "code",
"source" : "myUL.appendAll( Seq(\"a : 1\", \"b : 2\"))",
"outputs" : [ {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 7,
"time" : "Took: 530 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "C4FDAB124BAC414E8CB8AB4E40B17B9E"
"cell_type" : "code",
"source" : "import org.apache.spark.streaming.{Time, Duration}\ndef delayedTime(timeThisBatch: Time, batchMillis: Long)(x: Double): Time = {\n timeThisBatch - Duration((x * batchMillis).toLong)\n}",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import org.apache.spark.streaming.{Time, Duration}\ndelayedTime: (timeThisBatch: org.apache.spark.streaming.Time, batchMillis: Long)(x: Double)org.apache.spark.streaming.Time\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 8,
"time" : "Took: 408 milliseconds, at 2016-4-28 20:15"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "E939E83668E345AEAC5FDEC6FC22A2AC"
"cell_type" : "code",
"source" : "def delayIt(rdd: RDD[Double], time: Time): RDD[(Time, Double)] = Double) => (delayedTime(time, 5000)(x), x))\n@transient val delayedStream = d.transform(delayIt _)",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "delayIt: (rdd: org.apache.spark.rdd.RDD[Double], time: org.apache.spark.streaming.Time)org.apache.spark.rdd.RDD[(org.apache.spark.streaming.Time, Double)]\ndelayedStream: org.apache.spark.streaming.dstream.DStream[(org.apache.spark.streaming.Time, Double)] = org.apache.spark.streaming.dstream.TransformedDStream@5f4894ea\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 9,
"time" : "Took: 531 milliseconds, at 2016-4-28 20:16"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "10B8DCE157EE4577B3A9BF74C361A78E"
"cell_type" : "code",
"source" : "@transient val formattedStream ={ case (t:Time, d:Double) => (f\"$d%.2f\", t) }",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "formattedStream: org.apache.spark.streaming.dstream.DStream[(String, org.apache.spark.streaming.Time)] = org.apache.spark.streaming.dstream.MappedDStream@4c13a21e\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 10,
"time" : "Took: 519 milliseconds, at 2016-4-28 20:16"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "BCF252B4F37C411F8635A9DF2B91C6F7"
"cell_type" : "code",
"source" : "import scala.collection.immutable\n\nobject CircularBuffer {\n def empty[T](): CircularBuffer[T] = immutable.Vector.empty[T]\n}\n\nimplicit class CircularBuffer[T](v: Vector[T]) extends Serializable {\n\n val maxSize = 4\n \n def get(): Vector[T] = v\n\n def addItem(item : T) : CircularBuffer[T] =\n if(maxSize > 0)\n v.drop(v.size - maxSize + 1) :+ item\n else \n this\n \n}",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import scala.collection.immutable\ndefined module CircularBuffer\ndefined class CircularBuffer\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 11,
"time" : "Took: 439 milliseconds, at 2016-4-28 20:17"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "0FD29724D6394DB880B3460217FA4EE3"
"cell_type" : "code",
"source" : "import org.apache.spark.streaming.State\n\ndef batch(t:Time): Long = (t.milliseconds % 5000)\ntype CB = CircularBuffer[(Time, Int)]\n\ndef trackStateFunc(batchTime: Time, key: String, value: Option[Time], state: State[CB]): Option[(String, Time, Int)] = {\n value.flatMap { (t: Time) =>\n if ( batch(t) <= batch(batchTime)) {\n val newState: CB = state.getOption.fold(Vector((t, 1)): CB){ (c) =>\n val (bef, hereOrAfter) = c.get.partition{ case (timeStamp, _) => batch(timeStamp) < batch(t) }\n (hereOrAfter.toList match {\n case Nil => (t, 1) :: Nil\n case (tS, cnt) :: tl if (batch(tS) == batch(t)) => (tS, cnt + 1) ::tl\n case l@_ => (t, 1) :: l\n }).toVector.foldLeft(bef: CB){ case (cB, item) => cB.addItem(item)}\n }\n state.update(newState)\n newState.get.find{ case (tS, cnt) => batch(tS) == batch(t) }.map{ case (ts, i) => (key, ts, i) }\n }\n else None\n }\n}\n",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import org.apache.spark.streaming.State\nbatch: (t: org.apache.spark.streaming.Time)Long\ndefined type alias CB\ntrackStateFunc: (batchTime: org.apache.spark.streaming.Time, key: String, value: Option[org.apache.spark.streaming.Time], state: org.apache.spark.streaming.State[CB])Option[(String, org.apache.spark.streaming.Time, Int)]\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 14,
"time" : "Took: 581 milliseconds, at 2016-4-28 20:25"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "151D01224CE94DE5B0CD819B5CBA298F"
"cell_type" : "code",
"source" : "import org.apache.spark.streaming.StateSpec\n\nval initialRDD: RDD[(String, CB)] = ssc.sparkContext.emptyRDD\n\n@transient val stateSpec = StateSpec.function(trackStateFunc _)\n .initialState(initialRDD)\n .numPartitions(2)\n .timeout(Seconds(40))",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import org.apache.spark.streaming.StateSpec\ninitialRDD: org.apache.spark.rdd.RDD[(String, CB)] = EmptyRDD[2] at emptyRDD at <console>:87\nstateSpec: org.apache.spark.streaming.StateSpec[String,org.apache.spark.streaming.Time,CB,(String, org.apache.spark.streaming.Time, Int)] = StateSpecImpl(<function4>)\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 15,
"time" : "Took: 476 milliseconds, at 2016-4-28 20:25"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "509C60EEBD904C908FB9D0AD01561964"
"cell_type" : "code",
"source" : "@transient val countUpdates = formattedStream.mapWithState(stateSpec)",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "countUpdates: org.apache.spark.streaming.dstream.MapWithStateDStream[String,org.apache.spark.streaming.Time,CB,(String, org.apache.spark.streaming.Time, Int)] = org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@25179d6b\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 16,
"time" : "Took: 544 milliseconds, at 2016-4-28 20:25"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "EB6A7369049546868E14CA1B941FD636"
"cell_type" : "code",
"source" : "@transient val countSnapShots = countUpdates.stateSnapshots()",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "countSnapShots: org.apache.spark.streaming.dstream.DStream[(String, CB)] = org.apache.spark.streaming.dstream.FlatMappedDStream@5adfe37d\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 15,
"time" : "Took: 469 milliseconds, at 2016-4-28 18:49"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "DA774D74BE9449329B7C162C4A73A794"
"cell_type" : "code",
"source" : "countUpdates.foreachRDD(rdd => {\n val elems = rdd.take(5)\n\n val r ={case (key, time, count) => s\"$key: $time $count\"}\n myUL(r)\n})\n",
"outputs" : [ {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 17,
"time" : "Took: 556 milliseconds, at 2016-4-28 20:25"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "159534BEAAE04E49A0CBFA9FA4816275"
"cell_type" : "code",
"source" : "countSnapShots.foreachRDD(rdd => {\n val elems = rdd.filter{ case (ts, c) => c.get.size > 1}.take(5)\n\n val r ={ case (key, cB) => {\n val cBStr ={ case (ts, c) => s\"$ts $c\"}.mkString(\";\")\n s\"$key: $cBStr\"\n }\n }\n myUL.appendAll(r)\n})\n",
"outputs" : [ {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 17,
"time" : "Took: 627 milliseconds, at 2016-4-28 18:49"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "FA2D60D619E5404985C3E391DEE42A9D"
"cell_type" : "code",
"source" : "ssc.start()",
"outputs" : [ {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 18,
"time" : "Took: 713 milliseconds, at 2016-4-28 20:25"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "F260D0AC81E6425C8C6B8F8AE7A5978E"
"cell_type" : "code",
"source" : "// Create and push some RDDs into\nfor (i <- 1 to 50) {\n lines.synchronized {\n lines += u()\n }\n Thread.sleep(4)\n}",
"outputs" : [ {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 19,
"time" : "Took: 834 milliseconds, at 2016-4-28 20:26"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "0C4D4F76718944A98EACC21EDCBD1AB4"
"cell_type" : "code",
"source" : "lines.size",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "res29: Int = 13\n"
}, {
"metadata" : { },
"data" : {
"text/html" : "13"
"output_type" : "execute_result",
"execution_count" : 20,
"time" : "Took: 719 milliseconds, at 2016-4-28 18:30"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "5066DD69C53345F4B3B6789BA392B51A"
"cell_type" : "code",
"source" : "ssc.stop()",
"outputs" : [ {
"metadata" : { },
"data" : {
"text/html" : ""
"output_type" : "execute_result",
"execution_count" : 21,
"time" : "Took: 2 seconds 932 milliseconds, at 2016-4-28 18:30"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : true,
"id" : "64596F0D7FC24E6D8E48872ACAE0FC20"
"cell_type" : "code",
"source" : "",
"outputs" : [ ]
} ],
"nbformat" : 4
