Skip to content

Instantly share code, notes, and snippets.

@huitseeker
Created April 29, 2016 08:40
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save huitseeker/3c6d1246178eea56d958a4757a8cadbd to your computer and use it in GitHub Desktop.
Notebook associated to the presentation of http://www.meetup.com/Big-Data-Romandie/events/230345605/ How to do Event time processing with Spark
{
"metadata" : {
"name" : "TrackStateByKey_1",
"user_save_timestamp" : "1970-01-01T01:00:00.000Z",
"auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
"language_info" : {
"name" : "scala",
"file_extension" : "scala",
"codemirror_mode" : "text/x-scala"
},
"trusted" : true,
"customLocalRepo" : null,
"customRepos" : null,
"customDeps" : null,
"customImports" : null,
"customArgs" : null,
"customSparkConf" : {
"spark.app.name" : "Notebook",
"spark.master" : "local[*]",
"spark.executor.memory" : "1G"
}
},
"cells" : [ {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "4D4434CB4119463F9A64575609BB4112"
},
"cell_type" : "code",
"source" : "\nimport org.apache.spark.streaming.{StreamingContext, Seconds}\n\n\n@transient val ssc = new StreamingContext(sparkContext, Seconds(5))\n//sparkContext.getConf.set(\"spark.streaming.checkpoint.directory\", \"file:///tmp/myTestNotebook\")\nssc.checkpoint(\"file:///tmp/myTestNotebook\")",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import org.apache.spark.streaming.{StreamingContext, Seconds}\nssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@2ae3a67b\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 1,
"time" : "Took: 1 second 360 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "B81B1DB5D0DA4DF2A8DEE78BCB0A5BEA"
},
"cell_type" : "code",
"source" : "import org.apache.spark.mllib.random.RandomRDDs._\ndef u(): RDD[Double] = normalRDD(ssc.sparkContext, 100000L, 10)",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import org.apache.spark.mllib.random.RandomRDDs._\nu: ()org.apache.spark.rdd.RDD[Double]\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 2,
"time" : "Took: 1 second 297 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"presentation" : {
"tabs_state" : "{\n \"tab_id\": \"#tab346563833-0\"\n}",
"pivot_chart_state" : "{\n \"hiddenAttributes\": [],\n \"menuLimit\": 200,\n \"cols\": [],\n \"rows\": [],\n \"vals\": [],\n \"exclusions\": {},\n \"inclusions\": {},\n \"unusedAttrsVertical\": 85,\n \"autoSortUnusedAttrs\": false,\n \"inclusionsInfo\": {},\n \"aggregatorName\": \"Count\",\n \"rendererName\": \"Table\"\n}"
},
"id" : "533774C60CFF438982FB634A2989460C"
},
"cell_type" : "code",
"source" : "u().take(3)",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "res5: Array[Double] = Array(-0.2536850909336559, -1.5834348101256361, 1.5600677201796114)\n"
}, {
"metadata" : { },
"data" : {
"text/html" : "<div>\n <script data-this=\"{&quot;dataId&quot;:&quot;anon646cf7ed00eecf2e97543910a8d964a7&quot;,&quot;dataInit&quot;:[],&quot;genId&quot;:&quot;346563833&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/tabs'], \n function(playground, _magictabs) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magictabs,\n \"o\": {}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <div>\n <ul class=\"nav nav-tabs\" id=\"ul346563833\"><li>\n <a href=\"#tab346563833-0\"><i class=\"fa fa-table\"/></a>\n </li><li>\n <a href=\"#tab346563833-1\"><i class=\"fa fa-dot-circle-o\"/></a>\n </li><li>\n <a href=\"#tab346563833-2\"><i class=\"fa fa-line-chart\"/></a>\n </li><li>\n <a href=\"#tab346563833-3\"><i class=\"fa fa-bar-chart\"/></a>\n </li><li>\n <a href=\"#tab346563833-4\"><i class=\"fa fa-cubes\"/></a>\n </li></ul>\n\n <div class=\"tab-content\" id=\"tab346563833\"><div class=\"tab-pane\" id=\"tab346563833-0\">\n <div>\n <script data-this=\"{&quot;dataId&quot;:&quot;anon6627663121e7dc437df2f007e63f43bf&quot;,&quot;dataInit&quot;:[{&quot;_1&quot;:0,&quot;_2&quot;:-0.2536850909336559},{&quot;_1&quot;:1,&quot;_2&quot;:-1.5834348101256361},{&quot;_1&quot;:2,&quot;_2&quot;:1.5600677201796114}],&quot;genId&quot;:&quot;989451330&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/tableChart'], \n function(playground, _magictableChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magictableChart,\n \"o\": {\"headers\":[\"_1\",\"_2\"],\"width\":600,\"height\":400}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon643a0eca6b368dc7e24e89c02684e644&quot;,&quot;initialValue&quot;:&quot;3&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anonf6c947832368be0e05a0e6ed9e3d3110&quot;,&quot;initialValue&quot;:&quot;&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div><div class=\"tab-pane\" id=\"tab346563833-1\">\n <div>\n <script data-this=\"{&quot;dataId&quot;:&quot;anona9d8e36ad6f4ae2664b339ae639e02e7&quot;,&quot;dataInit&quot;:[{&quot;_1&quot;:0,&quot;_2&quot;:-0.2536850909336559},{&quot;_1&quot;:1,&quot;_2&quot;:-1.5834348101256361},{&quot;_1&quot;:2,&quot;_2&quot;:1.5600677201796114}],&quot;genId&quot;:&quot;974697078&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/scatterChart'], \n function(playground, _magicscatterChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magicscatterChart,\n \"o\": {\"x\":\"_1\",\"y\":\"_2\",\"width\":600,\"height\":400}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon4acb7a8a2671a711ddc7674621998daa&quot;,&quot;initialValue&quot;:&quot;3&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon07144a025aafaafc922c1f54b060e2c2&quot;,&quot;initialValue&quot;:&quot;&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div><div class=\"tab-pane\" id=\"tab346563833-2\">\n <div>\n <script data-this=\"{&quot;dataId&quot;:&quot;anonde8d1b4b365d7609689af9c7705db6ef&quot;,&quot;dataInit&quot;:[{&quot;_1&quot;:0,&quot;_2&quot;:-0.2536850909336559},{&quot;_1&quot;:1,&quot;_2&quot;:-1.5834348101256361},{&quot;_1&quot;:2,&quot;_2&quot;:1.5600677201796114}],&quot;genId&quot;:&quot;344358664&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/lineChart'], \n function(playground, _magiclineChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magiclineChart,\n \"o\": {\"x\":\"_1\",\"y\":\"_2\",\"width\":600,\"height\":400}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anonb2b92dfd907ca12d64ce52a7d9a77a71&quot;,&quot;initialValue&quot;:&quot;3&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anoneecbd61c21c99f3de8385cfb194f0fcb&quot;,&quot;initialValue&quot;:&quot;&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div><div class=\"tab-pane\" id=\"tab346563833-3\">\n <div>\n <script data-this=\"{&quot;dataId&quot;:&quot;anonfa5f5e6ba6d856d048aacef14b383b13&quot;,&quot;dataInit&quot;:[{&quot;_1&quot;:0,&quot;_2&quot;:-0.2536850909336559},{&quot;_1&quot;:1,&quot;_2&quot;:-1.5834348101256361},{&quot;_1&quot;:2,&quot;_2&quot;:1.5600677201796114}],&quot;genId&quot;:&quot;170632493&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/barChart'], \n function(playground, _magicbarChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magicbarChart,\n \"o\": {\"x\":\"_1\",\"y\":\"_2\",\"width\":600,\"height\":400}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon938ceb3055c953e726c22aba218658d5&quot;,&quot;initialValue&quot;:&quot;3&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon51047b898f76a66b2817ce0fa3bd4acb&quot;,&quot;initialValue&quot;:&quot;&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div><div class=\"tab-pane\" id=\"tab346563833-4\">\n <div>\n <script data-this=\"{&quot;dataId&quot;:&quot;anon0c44f3bad9fec8cd2c8457f506903b62&quot;,&quot;dataInit&quot;:[{&quot;_1&quot;:0,&quot;_2&quot;:-0.2536850909336559},{&quot;_1&quot;:1,&quot;_2&quot;:-1.5834348101256361},{&quot;_1&quot;:2,&quot;_2&quot;:1.5600677201796114}],&quot;genId&quot;:&quot;1267425173&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/pivotChart'], \n function(playground, _magicpivotChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magicpivotChart,\n \"o\": {\"width\":600,\"height\":400,\"derivedAttributes\":{},\"extraOptions\":{}}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon3d7f48c1302d252dc1a5d4adcf9c87c7&quot;,&quot;initialValue&quot;:&quot;3&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anona1224e1793c8037996df33094c7a646c&quot;,&quot;initialValue&quot;:&quot;&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>\n </div></div>\n </div>\n </div></div>"
},
"output_type" : "execute_result",
"execution_count" : 3,
"time" : "Took: 1 second 993 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "622702ED1D5D451280F515203008D8E3"
},
"cell_type" : "code",
"source" : "import org.apache.spark.rdd.RDD\n@transient val lines = scala.collection.mutable.Queue[RDD[Double]]()\n@transient val d = ssc.queueStream(lines)",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import org.apache.spark.rdd.RDD\nlines: scala.collection.mutable.Queue[org.apache.spark.rdd.RDD[Double]] = Queue()\nd: org.apache.spark.streaming.dstream.InputDStream[Double] = org.apache.spark.streaming.dstream.QueueInputDStream@15272aa6\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 4,
"time" : "Took: 760 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : true,
"collapsed" : false,
"id" : "5D664DEA2E254EE7A7316E0030529583"
},
"cell_type" : "code",
"source" : "import notebook.front.widgets._\nimport notebook.front.DataConnectedWidget\nimport notebook.{JsonCodec, Codec}\nimport play.api.libs.json._\n\n\nclass A extends DataConnectedWidget[String] with Serializable {\n @transient implicit val singleCodec:Codec[JsValue, String] = JsonCodec.strings\n\n @transient val capacity = 10\n \n @transient val initData = Seq.empty[String]\n @transient val prefill:Option[String]=None\n \n @transient var data = (initData.size, prefill) match {\n case (0, None) => Seq.empty[String]\n case (x, None) => initData\n case (0, Some(i)) => Seq.fill(capacity)(i)\n case (x, Some(i)) => initData.padTo(capacity, i)\n }\n\n apply(data)\n\n @transient lazy val toHtml = <ul data-bind=\"foreach: value\">\n <li data-bind=\"html: $data\"></li>{\n scopedScript(\n \"\"\"\n |req(\n |['observable', 'knockout'],\n |function (O, ko) {\n | ko.applyBindings({\n | value: O.makeObservable(valueId)\n | },\n | this\n | );\n |});\n \"\"\"stripMargin,\n Json.obj(\"valueId\" -> dataConnection.id)\n )\n }</ul>\n\n override def apply(d:Seq[String]) {\n data = if (d.size > capacity) {\n d.drop(d.size - capacity)\n } else {\n d\n }\n super.apply(data)\n }\n\n def append(s:String) {\n apply(data :+ s)\n }\n\n def appendAll(s:Seq[String]) {\n apply(data ++ s)\n }\n }\n@transient val myUL = new A()",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "<console>:73: warning: postfix operator stripMargin should be enabled\nby making the implicit value scala.language.postfixOps visible.\nThis can be achieved by adding the import clause 'import scala.language.postfixOps'\nor by setting the compiler option -language:postfixOps.\nSee the Scala docs for value scala.language.postfixOps for a discussion\nwhy the feature should be explicitly enabled.\n \"\"\"stripMargin,\n ^\nimport notebook.front.widgets._\nimport notebook.front.DataConnectedWidget\nimport notebook.{JsonCodec, Codec}\nimport play.api.libs.json._\ndefined class A\nmyUL: A = <A widget>\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 5,
"time" : "Took: 1 second 186 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "90F77BF19F27457D8EADFBF4ADA7347F"
},
"cell_type" : "code",
"source" : "myUL",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "res9: A = <A widget>\n"
}, {
"metadata" : { },
"data" : {
"text/html" : "<ul data-bind=\"foreach: value\">\n <li data-bind=\"html: $data\"></li><script data-this=\"{&quot;valueId&quot;:&quot;anona49e28e3217b3dead83a00ba1cbffed5&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
},
"output_type" : "execute_result",
"execution_count" : 6,
"time" : "Took: 578 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "80CD84F6919D402D801765B2B6DDCFCA"
},
"cell_type" : "code",
"source" : "myUL.appendAll( Seq(\"a : 1\", \"b : 2\"))",
"outputs" : [ {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 7,
"time" : "Took: 530 milliseconds, at 2016-4-28 20:13"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "C4FDAB124BAC414E8CB8AB4E40B17B9E"
},
"cell_type" : "code",
"source" : "import org.apache.spark.streaming.{Time, Duration}\ndef delayedTime(timeThisBatch: Time, batchMillis: Long)(x: Double): Time = {\n timeThisBatch - Duration((x * batchMillis).toLong)\n}",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import org.apache.spark.streaming.{Time, Duration}\ndelayedTime: (timeThisBatch: org.apache.spark.streaming.Time, batchMillis: Long)(x: Double)org.apache.spark.streaming.Time\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 8,
"time" : "Took: 408 milliseconds, at 2016-4-28 20:15"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "E939E83668E345AEAC5FDEC6FC22A2AC"
},
"cell_type" : "code",
"source" : "def delayIt(rdd: RDD[Double], time: Time): RDD[(Time, Double)] = rdd.map((x: Double) => (delayedTime(time, 5000)(x), x))\n@transient val delayedStream = d.transform(delayIt _)",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "delayIt: (rdd: org.apache.spark.rdd.RDD[Double], time: org.apache.spark.streaming.Time)org.apache.spark.rdd.RDD[(org.apache.spark.streaming.Time, Double)]\ndelayedStream: org.apache.spark.streaming.dstream.DStream[(org.apache.spark.streaming.Time, Double)] = org.apache.spark.streaming.dstream.TransformedDStream@5f4894ea\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 9,
"time" : "Took: 531 milliseconds, at 2016-4-28 20:16"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "10B8DCE157EE4577B3A9BF74C361A78E"
},
"cell_type" : "code",
"source" : "@transient val formattedStream = delayedStream.map{ case (t:Time, d:Double) => (f\"$d%.2f\", t) }",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "formattedStream: org.apache.spark.streaming.dstream.DStream[(String, org.apache.spark.streaming.Time)] = org.apache.spark.streaming.dstream.MappedDStream@4c13a21e\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 10,
"time" : "Took: 519 milliseconds, at 2016-4-28 20:16"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "BCF252B4F37C411F8635A9DF2B91C6F7"
},
"cell_type" : "code",
"source" : "import scala.collection.immutable\n\nobject CircularBuffer {\n def empty[T](): CircularBuffer[T] = immutable.Vector.empty[T]\n}\n\nimplicit class CircularBuffer[T](v: Vector[T]) extends Serializable {\n\n val maxSize = 4\n \n def get(): Vector[T] = v\n\n def addItem(item : T) : CircularBuffer[T] =\n if(maxSize > 0)\n v.drop(v.size - maxSize + 1) :+ item\n else \n this\n \n}",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import scala.collection.immutable\ndefined module CircularBuffer\ndefined class CircularBuffer\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 11,
"time" : "Took: 439 milliseconds, at 2016-4-28 20:17"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "0FD29724D6394DB880B3460217FA4EE3"
},
"cell_type" : "code",
"source" : "import org.apache.spark.streaming.State\n\ndef batch(t:Time): Long = (t.milliseconds % 5000)\ntype CB = CircularBuffer[(Time, Int)]\n\ndef trackStateFunc(batchTime: Time, key: String, value: Option[Time], state: State[CB]): Option[(String, Time, Int)] = {\n value.flatMap { (t: Time) =>\n if ( batch(t) <= batch(batchTime)) {\n val newState: CB = state.getOption.fold(Vector((t, 1)): CB){ (c) =>\n val (bef, hereOrAfter) = c.get.partition{ case (timeStamp, _) => batch(timeStamp) < batch(t) }\n (hereOrAfter.toList match {\n case Nil => (t, 1) :: Nil\n case (tS, cnt) :: tl if (batch(tS) == batch(t)) => (tS, cnt + 1) ::tl\n case l@_ => (t, 1) :: l\n }).toVector.foldLeft(bef: CB){ case (cB, item) => cB.addItem(item)}\n }\n state.update(newState)\n newState.get.find{ case (tS, cnt) => batch(tS) == batch(t) }.map{ case (ts, i) => (key, ts, i) }\n }\n else None\n }\n}\n",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import org.apache.spark.streaming.State\nbatch: (t: org.apache.spark.streaming.Time)Long\ndefined type alias CB\ntrackStateFunc: (batchTime: org.apache.spark.streaming.Time, key: String, value: Option[org.apache.spark.streaming.Time], state: org.apache.spark.streaming.State[CB])Option[(String, org.apache.spark.streaming.Time, Int)]\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 14,
"time" : "Took: 581 milliseconds, at 2016-4-28 20:25"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "151D01224CE94DE5B0CD819B5CBA298F"
},
"cell_type" : "code",
"source" : "import org.apache.spark.streaming.StateSpec\n\nval initialRDD: RDD[(String, CB)] = ssc.sparkContext.emptyRDD\n\n@transient val stateSpec = StateSpec.function(trackStateFunc _)\n .initialState(initialRDD)\n .numPartitions(2)\n .timeout(Seconds(40))",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "import org.apache.spark.streaming.StateSpec\ninitialRDD: org.apache.spark.rdd.RDD[(String, CB)] = EmptyRDD[2] at emptyRDD at <console>:87\nstateSpec: org.apache.spark.streaming.StateSpec[String,org.apache.spark.streaming.Time,CB,(String, org.apache.spark.streaming.Time, Int)] = StateSpecImpl(<function4>)\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 15,
"time" : "Took: 476 milliseconds, at 2016-4-28 20:25"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "509C60EEBD904C908FB9D0AD01561964"
},
"cell_type" : "code",
"source" : "@transient val countUpdates = formattedStream.mapWithState(stateSpec)",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "countUpdates: org.apache.spark.streaming.dstream.MapWithStateDStream[String,org.apache.spark.streaming.Time,CB,(String, org.apache.spark.streaming.Time, Int)] = org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@25179d6b\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 16,
"time" : "Took: 544 milliseconds, at 2016-4-28 20:25"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "EB6A7369049546868E14CA1B941FD636"
},
"cell_type" : "code",
"source" : "@transient val countSnapShots = countUpdates.stateSnapshots()",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "countSnapShots: org.apache.spark.streaming.dstream.DStream[(String, CB)] = org.apache.spark.streaming.dstream.FlatMappedDStream@5adfe37d\n"
}, {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 15,
"time" : "Took: 469 milliseconds, at 2016-4-28 18:49"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "DA774D74BE9449329B7C162C4A73A794"
},
"cell_type" : "code",
"source" : "countUpdates.foreachRDD(rdd => {\n val elems = rdd.take(5)\n\n val r = elems.map{case (key, time, count) => s\"$key: $time $count\"}\n myUL(r)\n})\n",
"outputs" : [ {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 17,
"time" : "Took: 556 milliseconds, at 2016-4-28 20:25"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "159534BEAAE04E49A0CBFA9FA4816275"
},
"cell_type" : "code",
"source" : "countSnapShots.foreachRDD(rdd => {\n val elems = rdd.filter{ case (ts, c) => c.get.size > 1}.take(5)\n\n val r = elems.map{ case (key, cB) => {\n val cBStr = cB.get.map{ case (ts, c) => s\"$ts $c\"}.mkString(\";\")\n s\"$key: $cBStr\"\n }\n }\n myUL.appendAll(r)\n})\n",
"outputs" : [ {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 17,
"time" : "Took: 627 milliseconds, at 2016-4-28 18:49"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "FA2D60D619E5404985C3E391DEE42A9D"
},
"cell_type" : "code",
"source" : "ssc.start()",
"outputs" : [ {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 18,
"time" : "Took: 713 milliseconds, at 2016-4-28 20:25"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "F260D0AC81E6425C8C6B8F8AE7A5978E"
},
"cell_type" : "code",
"source" : "// Create and push some RDDs into\nfor (i <- 1 to 50) {\n lines.synchronized {\n lines += u()\n }\n Thread.sleep(4)\n}",
"outputs" : [ {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 19,
"time" : "Took: 834 milliseconds, at 2016-4-28 20:26"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "0C4D4F76718944A98EACC21EDCBD1AB4"
},
"cell_type" : "code",
"source" : "lines.size",
"outputs" : [ {
"name" : "stdout",
"output_type" : "stream",
"text" : "res29: Int = 13\n"
}, {
"metadata" : { },
"data" : {
"text/html" : "13"
},
"output_type" : "execute_result",
"execution_count" : 20,
"time" : "Took: 719 milliseconds, at 2016-4-28 18:30"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"id" : "5066DD69C53345F4B3B6789BA392B51A"
},
"cell_type" : "code",
"source" : "ssc.stop()",
"outputs" : [ {
"metadata" : { },
"data" : {
"text/html" : ""
},
"output_type" : "execute_result",
"execution_count" : 21,
"time" : "Took: 2 seconds 932 milliseconds, at 2016-4-28 18:30"
} ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : true,
"id" : "64596F0D7FC24E6D8E48872ACAE0FC20"
},
"cell_type" : "code",
"source" : "",
"outputs" : [ ]
} ],
"nbformat" : 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment