-
-
Save huitseeker/3c6d1246178eea56d958a4757a8cadbd to your computer and use it in GitHub Desktop.
Notebook associated to the presentation of http://www.meetup.com/Big-Data-Romandie/events/230345605/ How to do Event time processing with Spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"metadata" : { | |
"name" : "TrackStateByKey_1", | |
"user_save_timestamp" : "1970-01-01T01:00:00.000Z", | |
"auto_save_timestamp" : "1970-01-01T01:00:00.000Z", | |
"language_info" : { | |
"name" : "scala", | |
"file_extension" : "scala", | |
"codemirror_mode" : "text/x-scala" | |
}, | |
"trusted" : true, | |
"customLocalRepo" : null, | |
"customRepos" : null, | |
"customDeps" : null, | |
"customImports" : null, | |
"customArgs" : null, | |
"customSparkConf" : { | |
"spark.app.name" : "Notebook", | |
"spark.master" : "local[*]", | |
"spark.executor.memory" : "1G" | |
} | |
}, | |
"cells" : [ { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "4D4434CB4119463F9A64575609BB4112" | |
}, | |
"cell_type" : "code", | |
"source" : "\nimport org.apache.spark.streaming.{StreamingContext, Seconds}\n\n\n@transient val ssc = new StreamingContext(sparkContext, Seconds(5))\n//sparkContext.getConf.set(\"spark.streaming.checkpoint.directory\", \"file:///tmp/myTestNotebook\")\nssc.checkpoint(\"file:///tmp/myTestNotebook\")", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "import org.apache.spark.streaming.{StreamingContext, Seconds}\nssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@2ae3a67b\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 1, | |
"time" : "Took: 1 second 360 milliseconds, at 2016-4-28 20:13" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "B81B1DB5D0DA4DF2A8DEE78BCB0A5BEA" | |
}, | |
"cell_type" : "code", | |
"source" : "import org.apache.spark.mllib.random.RandomRDDs._\ndef u(): RDD[Double] = normalRDD(ssc.sparkContext, 100000L, 10)", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "import org.apache.spark.mllib.random.RandomRDDs._\nu: ()org.apache.spark.rdd.RDD[Double]\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 2, | |
"time" : "Took: 1 second 297 milliseconds, at 2016-4-28 20:13" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"presentation" : { | |
"tabs_state" : "{\n \"tab_id\": \"#tab346563833-0\"\n}", | |
"pivot_chart_state" : "{\n \"hiddenAttributes\": [],\n \"menuLimit\": 200,\n \"cols\": [],\n \"rows\": [],\n \"vals\": [],\n \"exclusions\": {},\n \"inclusions\": {},\n \"unusedAttrsVertical\": 85,\n \"autoSortUnusedAttrs\": false,\n \"inclusionsInfo\": {},\n \"aggregatorName\": \"Count\",\n \"rendererName\": \"Table\"\n}" | |
}, | |
"id" : "533774C60CFF438982FB634A2989460C" | |
}, | |
"cell_type" : "code", | |
"source" : "u().take(3)", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "res5: Array[Double] = Array(-0.2536850909336559, -1.5834348101256361, 1.5600677201796114)\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "<div>\n <script data-this=\"{"dataId":"anon646cf7ed00eecf2e97543910a8d964a7","dataInit":[],"genId":"346563833"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/tabs'], \n function(playground, _magictabs) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magictabs,\n \"o\": {}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <div>\n <ul class=\"nav nav-tabs\" id=\"ul346563833\"><li>\n <a href=\"#tab346563833-0\"><i class=\"fa fa-table\"/></a>\n </li><li>\n <a href=\"#tab346563833-1\"><i class=\"fa fa-dot-circle-o\"/></a>\n </li><li>\n <a href=\"#tab346563833-2\"><i class=\"fa fa-line-chart\"/></a>\n </li><li>\n <a href=\"#tab346563833-3\"><i class=\"fa fa-bar-chart\"/></a>\n </li><li>\n <a href=\"#tab346563833-4\"><i class=\"fa fa-cubes\"/></a>\n </li></ul>\n\n <div class=\"tab-content\" id=\"tab346563833\"><div class=\"tab-pane\" id=\"tab346563833-0\">\n <div>\n <script data-this=\"{"dataId":"anon6627663121e7dc437df2f007e63f43bf","dataInit":[{"_1":0,"_2":-0.2536850909336559},{"_1":1,"_2":-1.5834348101256361},{"_1":2,"_2":1.5600677201796114}],"genId":"989451330"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/tableChart'], \n function(playground, _magictableChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magictableChart,\n \"o\": {\"headers\":[\"_1\",\"_2\"],\"width\":600,\"height\":400}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anon643a0eca6b368dc7e24e89c02684e644","initialValue":"3"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anonf6c947832368be0e05a0e6ed9e3d3110","initialValue":""}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div><div class=\"tab-pane\" id=\"tab346563833-1\">\n <div>\n <script data-this=\"{"dataId":"anona9d8e36ad6f4ae2664b339ae639e02e7","dataInit":[{"_1":0,"_2":-0.2536850909336559},{"_1":1,"_2":-1.5834348101256361},{"_1":2,"_2":1.5600677201796114}],"genId":"974697078"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/scatterChart'], \n function(playground, _magicscatterChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magicscatterChart,\n \"o\": {\"x\":\"_1\",\"y\":\"_2\",\"width\":600,\"height\":400}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anon4acb7a8a2671a711ddc7674621998daa","initialValue":"3"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anon07144a025aafaafc922c1f54b060e2c2","initialValue":""}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div><div class=\"tab-pane\" id=\"tab346563833-2\">\n <div>\n <script data-this=\"{"dataId":"anonde8d1b4b365d7609689af9c7705db6ef","dataInit":[{"_1":0,"_2":-0.2536850909336559},{"_1":1,"_2":-1.5834348101256361},{"_1":2,"_2":1.5600677201796114}],"genId":"344358664"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/lineChart'], \n function(playground, _magiclineChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magiclineChart,\n \"o\": {\"x\":\"_1\",\"y\":\"_2\",\"width\":600,\"height\":400}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anonb2b92dfd907ca12d64ce52a7d9a77a71","initialValue":"3"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anoneecbd61c21c99f3de8385cfb194f0fcb","initialValue":""}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div><div class=\"tab-pane\" id=\"tab346563833-3\">\n <div>\n <script data-this=\"{"dataId":"anonfa5f5e6ba6d856d048aacef14b383b13","dataInit":[{"_1":0,"_2":-0.2536850909336559},{"_1":1,"_2":-1.5834348101256361},{"_1":2,"_2":1.5600677201796114}],"genId":"170632493"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/barChart'], \n function(playground, _magicbarChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magicbarChart,\n \"o\": {\"x\":\"_1\",\"y\":\"_2\",\"width\":600,\"height\":400}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anon938ceb3055c953e726c22aba218658d5","initialValue":"3"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anon51047b898f76a66b2817ce0fa3bd4acb","initialValue":""}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div><div class=\"tab-pane\" id=\"tab346563833-4\">\n <div>\n <script data-this=\"{"dataId":"anon0c44f3bad9fec8cd2c8457f506903b62","dataInit":[{"_1":0,"_2":-0.2536850909336559},{"_1":1,"_2":-1.5834348101256361},{"_1":2,"_2":1.5600677201796114}],"genId":"1267425173"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/pivotChart'], \n function(playground, _magicpivotChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magicpivotChart,\n \"o\": {\"width\":600,\"height\":400,\"derivedAttributes\":{},\"extraOptions\":{}}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anon3d7f48c1302d252dc1a5d4adcf9c87c7","initialValue":"3"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{"valueId":"anona1224e1793c8037996df33094c7a646c","initialValue":""}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div></div>\n </div>\n </div></div>" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 3, | |
"time" : "Took: 1 second 993 milliseconds, at 2016-4-28 20:13" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "622702ED1D5D451280F515203008D8E3" | |
}, | |
"cell_type" : "code", | |
"source" : "import org.apache.spark.rdd.RDD\n@transient val lines = scala.collection.mutable.Queue[RDD[Double]]()\n@transient val d = ssc.queueStream(lines)", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "import org.apache.spark.rdd.RDD\nlines: scala.collection.mutable.Queue[org.apache.spark.rdd.RDD[Double]] = Queue()\nd: org.apache.spark.streaming.dstream.InputDStream[Double] = org.apache.spark.streaming.dstream.QueueInputDStream@15272aa6\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 4, | |
"time" : "Took: 760 milliseconds, at 2016-4-28 20:13" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : true, | |
"collapsed" : false, | |
"id" : "5D664DEA2E254EE7A7316E0030529583" | |
}, | |
"cell_type" : "code", | |
"source" : "import notebook.front.widgets._\nimport notebook.front.DataConnectedWidget\nimport notebook.{JsonCodec, Codec}\nimport play.api.libs.json._\n\n\nclass A extends DataConnectedWidget[String] with Serializable {\n @transient implicit val singleCodec:Codec[JsValue, String] = JsonCodec.strings\n\n @transient val capacity = 10\n \n @transient val initData = Seq.empty[String]\n @transient val prefill:Option[String]=None\n \n @transient var data = (initData.size, prefill) match {\n case (0, None) => Seq.empty[String]\n case (x, None) => initData\n case (0, Some(i)) => Seq.fill(capacity)(i)\n case (x, Some(i)) => initData.padTo(capacity, i)\n }\n\n apply(data)\n\n @transient lazy val toHtml = <ul data-bind=\"foreach: value\">\n <li data-bind=\"html: $data\"></li>{\n scopedScript(\n \"\"\"\n |req(\n |['observable', 'knockout'],\n |function (O, ko) {\n | ko.applyBindings({\n | value: O.makeObservable(valueId)\n | },\n | this\n | );\n |});\n \"\"\"stripMargin,\n Json.obj(\"valueId\" -> dataConnection.id)\n )\n }</ul>\n\n override def apply(d:Seq[String]) {\n data = if (d.size > capacity) {\n d.drop(d.size - capacity)\n } else {\n d\n }\n super.apply(data)\n }\n\n def append(s:String) {\n apply(data :+ s)\n }\n\n def appendAll(s:Seq[String]) {\n apply(data ++ s)\n }\n }\n@transient val myUL = new A()", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "<console>:73: warning: postfix operator stripMargin should be enabled\nby making the implicit value scala.language.postfixOps visible.\nThis can be achieved by adding the import clause 'import scala.language.postfixOps'\nor by setting the compiler option -language:postfixOps.\nSee the Scala docs for value scala.language.postfixOps for a discussion\nwhy the feature should be explicitly enabled.\n \"\"\"stripMargin,\n ^\nimport notebook.front.widgets._\nimport notebook.front.DataConnectedWidget\nimport notebook.{JsonCodec, Codec}\nimport play.api.libs.json._\ndefined class A\nmyUL: A = <A widget>\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 5, | |
"time" : "Took: 1 second 186 milliseconds, at 2016-4-28 20:13" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "90F77BF19F27457D8EADFBF4ADA7347F" | |
}, | |
"cell_type" : "code", | |
"source" : "myUL", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "res9: A = <A widget>\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "<ul data-bind=\"foreach: value\">\n <li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anona49e28e3217b3dead83a00ba1cbffed5"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 6, | |
"time" : "Took: 578 milliseconds, at 2016-4-28 20:13" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "80CD84F6919D402D801765B2B6DDCFCA" | |
}, | |
"cell_type" : "code", | |
"source" : "myUL.appendAll( Seq(\"a : 1\", \"b : 2\"))", | |
"outputs" : [ { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 7, | |
"time" : "Took: 530 milliseconds, at 2016-4-28 20:13" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "C4FDAB124BAC414E8CB8AB4E40B17B9E" | |
}, | |
"cell_type" : "code", | |
"source" : "import org.apache.spark.streaming.{Time, Duration}\ndef delayedTime(timeThisBatch: Time, batchMillis: Long)(x: Double): Time = {\n timeThisBatch - Duration((x * batchMillis).toLong)\n}", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "import org.apache.spark.streaming.{Time, Duration}\ndelayedTime: (timeThisBatch: org.apache.spark.streaming.Time, batchMillis: Long)(x: Double)org.apache.spark.streaming.Time\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 8, | |
"time" : "Took: 408 milliseconds, at 2016-4-28 20:15" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "E939E83668E345AEAC5FDEC6FC22A2AC" | |
}, | |
"cell_type" : "code", | |
"source" : "def delayIt(rdd: RDD[Double], time: Time): RDD[(Time, Double)] = rdd.map((x: Double) => (delayedTime(time, 5000)(x), x))\n@transient val delayedStream = d.transform(delayIt _)", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "delayIt: (rdd: org.apache.spark.rdd.RDD[Double], time: org.apache.spark.streaming.Time)org.apache.spark.rdd.RDD[(org.apache.spark.streaming.Time, Double)]\ndelayedStream: org.apache.spark.streaming.dstream.DStream[(org.apache.spark.streaming.Time, Double)] = org.apache.spark.streaming.dstream.TransformedDStream@5f4894ea\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 9, | |
"time" : "Took: 531 milliseconds, at 2016-4-28 20:16" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "10B8DCE157EE4577B3A9BF74C361A78E" | |
}, | |
"cell_type" : "code", | |
"source" : "@transient val formattedStream = delayedStream.map{ case (t:Time, d:Double) => (f\"$d%.2f\", t) }", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "formattedStream: org.apache.spark.streaming.dstream.DStream[(String, org.apache.spark.streaming.Time)] = org.apache.spark.streaming.dstream.MappedDStream@4c13a21e\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 10, | |
"time" : "Took: 519 milliseconds, at 2016-4-28 20:16" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "BCF252B4F37C411F8635A9DF2B91C6F7" | |
}, | |
"cell_type" : "code", | |
"source" : "import scala.collection.immutable\n\nobject CircularBuffer {\n def empty[T](): CircularBuffer[T] = immutable.Vector.empty[T]\n}\n\nimplicit class CircularBuffer[T](v: Vector[T]) extends Serializable {\n\n val maxSize = 4\n \n def get(): Vector[T] = v\n\n def addItem(item : T) : CircularBuffer[T] =\n if(maxSize > 0)\n v.drop(v.size - maxSize + 1) :+ item\n else \n this\n \n}", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "import scala.collection.immutable\ndefined module CircularBuffer\ndefined class CircularBuffer\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 11, | |
"time" : "Took: 439 milliseconds, at 2016-4-28 20:17" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "0FD29724D6394DB880B3460217FA4EE3" | |
}, | |
"cell_type" : "code", | |
"source" : "import org.apache.spark.streaming.State\n\ndef batch(t:Time): Long = (t.milliseconds % 5000)\ntype CB = CircularBuffer[(Time, Int)]\n\ndef trackStateFunc(batchTime: Time, key: String, value: Option[Time], state: State[CB]): Option[(String, Time, Int)] = {\n value.flatMap { (t: Time) =>\n if ( batch(t) <= batch(batchTime)) {\n val newState: CB = state.getOption.fold(Vector((t, 1)): CB){ (c) =>\n val (bef, hereOrAfter) = c.get.partition{ case (timeStamp, _) => batch(timeStamp) < batch(t) }\n (hereOrAfter.toList match {\n case Nil => (t, 1) :: Nil\n case (tS, cnt) :: tl if (batch(tS) == batch(t)) => (tS, cnt + 1) ::tl\n case l@_ => (t, 1) :: l\n }).toVector.foldLeft(bef: CB){ case (cB, item) => cB.addItem(item)}\n }\n state.update(newState)\n newState.get.find{ case (tS, cnt) => batch(tS) == batch(t) }.map{ case (ts, i) => (key, ts, i) }\n }\n else None\n }\n}\n", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "import org.apache.spark.streaming.State\nbatch: (t: org.apache.spark.streaming.Time)Long\ndefined type alias CB\ntrackStateFunc: (batchTime: org.apache.spark.streaming.Time, key: String, value: Option[org.apache.spark.streaming.Time], state: org.apache.spark.streaming.State[CB])Option[(String, org.apache.spark.streaming.Time, Int)]\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 14, | |
"time" : "Took: 581 milliseconds, at 2016-4-28 20:25" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "151D01224CE94DE5B0CD819B5CBA298F" | |
}, | |
"cell_type" : "code", | |
"source" : "import org.apache.spark.streaming.StateSpec\n\nval initialRDD: RDD[(String, CB)] = ssc.sparkContext.emptyRDD\n\n@transient val stateSpec = StateSpec.function(trackStateFunc _)\n .initialState(initialRDD)\n .numPartitions(2)\n .timeout(Seconds(40))", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "import org.apache.spark.streaming.StateSpec\ninitialRDD: org.apache.spark.rdd.RDD[(String, CB)] = EmptyRDD[2] at emptyRDD at <console>:87\nstateSpec: org.apache.spark.streaming.StateSpec[String,org.apache.spark.streaming.Time,CB,(String, org.apache.spark.streaming.Time, Int)] = StateSpecImpl(<function4>)\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 15, | |
"time" : "Took: 476 milliseconds, at 2016-4-28 20:25" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "509C60EEBD904C908FB9D0AD01561964" | |
}, | |
"cell_type" : "code", | |
"source" : "@transient val countUpdates = formattedStream.mapWithState(stateSpec)", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "countUpdates: org.apache.spark.streaming.dstream.MapWithStateDStream[String,org.apache.spark.streaming.Time,CB,(String, org.apache.spark.streaming.Time, Int)] = org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@25179d6b\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 16, | |
"time" : "Took: 544 milliseconds, at 2016-4-28 20:25" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "EB6A7369049546868E14CA1B941FD636" | |
}, | |
"cell_type" : "code", | |
"source" : "@transient val countSnapShots = countUpdates.stateSnapshots()", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "countSnapShots: org.apache.spark.streaming.dstream.DStream[(String, CB)] = org.apache.spark.streaming.dstream.FlatMappedDStream@5adfe37d\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 15, | |
"time" : "Took: 469 milliseconds, at 2016-4-28 18:49" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "DA774D74BE9449329B7C162C4A73A794" | |
}, | |
"cell_type" : "code", | |
"source" : "countUpdates.foreachRDD(rdd => {\n val elems = rdd.take(5)\n\n val r = elems.map{case (key, time, count) => s\"$key: $time $count\"}\n myUL(r)\n})\n", | |
"outputs" : [ { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 17, | |
"time" : "Took: 556 milliseconds, at 2016-4-28 20:25" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "159534BEAAE04E49A0CBFA9FA4816275" | |
}, | |
"cell_type" : "code", | |
"source" : "countSnapShots.foreachRDD(rdd => {\n val elems = rdd.filter{ case (ts, c) => c.get.size > 1}.take(5)\n\n val r = elems.map{ case (key, cB) => {\n val cBStr = cB.get.map{ case (ts, c) => s\"$ts $c\"}.mkString(\";\")\n s\"$key: $cBStr\"\n }\n }\n myUL.appendAll(r)\n})\n", | |
"outputs" : [ { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 17, | |
"time" : "Took: 627 milliseconds, at 2016-4-28 18:49" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "FA2D60D619E5404985C3E391DEE42A9D" | |
}, | |
"cell_type" : "code", | |
"source" : "ssc.start()", | |
"outputs" : [ { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 18, | |
"time" : "Took: 713 milliseconds, at 2016-4-28 20:25" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "F260D0AC81E6425C8C6B8F8AE7A5978E" | |
}, | |
"cell_type" : "code", | |
"source" : "// Create and push some RDDs into\nfor (i <- 1 to 50) {\n lines.synchronized {\n lines += u()\n }\n Thread.sleep(4)\n}", | |
"outputs" : [ { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 19, | |
"time" : "Took: 834 milliseconds, at 2016-4-28 20:26" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "0C4D4F76718944A98EACC21EDCBD1AB4" | |
}, | |
"cell_type" : "code", | |
"source" : "lines.size", | |
"outputs" : [ { | |
"name" : "stdout", | |
"output_type" : "stream", | |
"text" : "res29: Int = 13\n" | |
}, { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "13" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 20, | |
"time" : "Took: 719 milliseconds, at 2016-4-28 18:30" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : false, | |
"id" : "5066DD69C53345F4B3B6789BA392B51A" | |
}, | |
"cell_type" : "code", | |
"source" : "ssc.stop()", | |
"outputs" : [ { | |
"metadata" : { }, | |
"data" : { | |
"text/html" : "" | |
}, | |
"output_type" : "execute_result", | |
"execution_count" : 21, | |
"time" : "Took: 2 seconds 932 milliseconds, at 2016-4-28 18:30" | |
} ] | |
}, { | |
"metadata" : { | |
"trusted" : true, | |
"input_collapsed" : false, | |
"collapsed" : true, | |
"id" : "64596F0D7FC24E6D8E48872ACAE0FC20" | |
}, | |
"cell_type" : "code", | |
"source" : "", | |
"outputs" : [ ] | |
} ], | |
"nbformat" : 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment