Skip to content

Instantly share code, notes, and snippets.

@maasg
Created July 27, 2017 14:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maasg/9d51a2a42fc831e385cf744b84e80479 to your computer and use it in GitHub Desktop.
Save maasg/9d51a2a42fc831e385cf744b84e80479 to your computer and use it in GitHub Desktop.
Build path for different events and assign globalID
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"id": "295a615e-43da-4ebb-9aac-776a28141f84",
"name": "GlobalUniqueState.snb",
"user_save_timestamp": "2017-07-27T16:18:09.707Z",
"auto_save_timestamp": "1970-01-01T01:00:00.000Z",
"language_info": {
"name": "scala",
"file_extension": "scala",
"codemirror_mode": "text/x-scala"
},
"trusted": true,
"sparkNotebook": null,
"customLocalRepo": null,
"customRepos": null,
"customDeps": null,
"customImports": null,
"customArgs": null,
"customSparkConf": null,
"customVars": null
},
"cells": [
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "798981D2E2BA4F8B9EE088E3BE243145"
},
"cell_type": "code",
"source": "import org.apache.spark.streaming._\nimport org.apache.spark.streaming.dstream.QueueInputDStream\n",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "import org.apache.spark.streaming._\nimport org.apache.spark.streaming.dstream.QueueInputDStream\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 1,
"time": "Took: 1.085s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "6D72421639A845418934F7C62011627A"
},
"cell_type": "code",
"source": "val data = Seq( \"1|a,1|b,3|c\", \"2|d,2|e,2|f\", \"3|g,3|h,3|i,4|j\", \"5|k\", \"4|f,1|g\", \"6|h\")\nval inputRDDs = data.map(str => str.split(\",\")).map(arr => sparkContext.parallelize(arr))\n/** expected output\nZZ => 1a, 1b, 1c, 2d, 2e, 2f, 3g, 3h, 4f\nXX => 3c, 4j, 5k, 6h\nYY => 1g\n**/\n",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "data: Seq[String] = List(1|a,1|b,3|c, 2|d,2|e,2|f, 3|g,3|h,3|i,4|j, 5|k, 4|f,1|g, 6|h)\ninputRDDs: Seq[org.apache.spark.rdd.RDD[String]] = List(ParallelCollectionRDD[0] at parallelize at <console>:72, ParallelCollectionRDD[1] at parallelize at <console>:72, ParallelCollectionRDD[2] at parallelize at <console>:72, ParallelCollectionRDD[3] at parallelize at <console>:72, ParallelCollectionRDD[4] at parallelize at <console>:72, ParallelCollectionRDD[5] at parallelize at <console>:72)\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 2,
"time": "Took: 1.242s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "4FDAFE2A3E7B458086D5C235B3226697"
},
"cell_type": "code",
"source": "case class Event(id: Int, payload: String)",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "defined class Event\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 3,
"time": "Took: 0.894s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "3DBA2B7EB1124740A16DADA6755B69B6"
},
"cell_type": "code",
"source": "@transient val ssc = new StreamingContext(sparkContext, Seconds(5))",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@5db87859\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 4,
"time": "Took: 0.733s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "59152C091B524251A6741AECC244CCA5"
},
"cell_type": "code",
"source": "val queue = scala.collection.mutable.Queue(inputRDDs:_*)",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "queue: scala.collection.mutable.Queue[org.apache.spark.rdd.RDD[String]] = Queue(ParallelCollectionRDD[0] at parallelize at <console>:72, ParallelCollectionRDD[1] at parallelize at <console>:72, ParallelCollectionRDD[2] at parallelize at <console>:72, ParallelCollectionRDD[3] at parallelize at <console>:72, ParallelCollectionRDD[4] at parallelize at <console>:72, ParallelCollectionRDD[5] at parallelize at <console>:72)\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 5,
"time": "Took: 0.711s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "C51FDEBB5F9043D8A0D045F2B0BE2A9E"
},
"cell_type": "code",
"source": "// This is for test purposes only. Replace with actual stream source.\n@transient val queueDStream = ssc.queueStream(queue, oneAtATime = true)",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "queueDStream: org.apache.spark.streaming.dstream.InputDStream[String] = org.apache.spark.streaming.dstream.QueueInputDStream@6b7cdd97\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 6,
"time": "Took: 0.716s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "147F4ABC9B834C768197D79C303F26C0"
},
"cell_type": "code",
"source": "// this is our similarity function. Replace with something appropriate.\n// We're using the function notation instead of a def, b/c it's cleaner for the serialization.\nval isSimilar: Int => Int => Boolean = event1 => event2 => Math.abs(event2 - event1).toInt == 1 \n\n// Global Id Generator. Should generate unique ids each time - replace accordingly\nval genGlobalId: () => String = () => \"gen-\" + scala.util.Random.nextInt(10000)",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "isSimilar: Int => (Int => Boolean) = <function1>\ngenGlobalId: () => String = <function0>\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 7,
"time": "Took: 0.855s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "BD368F7AC4C645568923DB4AD92372F9"
},
"cell_type": "code",
"source": "// Here we have our initial Event stream\n@transient val eventStream = queueDStream.map{entry => \n val Array(id, payload) = entry.split(\"\\\\|\")\n Event(id.toInt, payload)\n}",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "eventStream: org.apache.spark.streaming.dstream.DStream[Event] = org.apache.spark.streaming.dstream.MappedDStream@56ab87b6\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 8,
"time": "Took: 0.868s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "3C96904AD563474CA8B4185F6B5B8A49"
},
"cell_type": "code",
"source": "@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "states: org.apache.spark.rdd.RDD[(String, (Int, Long))] = EmptyRDD[7] at emptyRDD at <console>:71\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 9,
"time": "Took: 0.690s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "5BF0201296CF47538F052BED61EF0509"
},
"cell_type": "code",
"source": "@transient var currentState: RDD[(String, (Int, Long))] = sparkContext.emptyRDD",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "currentState: org.apache.spark.rdd.RDD[(String, (Int, Long))] = EmptyRDD[8] at emptyRDD at <console>:71\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 10,
"time": "Took: 0.675s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "5EC3D2A2AC2D45C2831C88AA68992DA7"
},
"cell_type": "code",
"source": "@transient val eventsById = eventStream.map(event => (event.id, event))\n@transient val groupedEvents = eventsById.groupByKey()",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "eventsById: org.apache.spark.streaming.dstream.DStream[(Int, Event)] = org.apache.spark.streaming.dstream.MappedDStream@5ab031f5\ngroupedEvents: org.apache.spark.streaming.dstream.DStream[(Int, Iterable[Event])] = org.apache.spark.streaming.dstream.ShuffledDStream@2367d0e2\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 11,
"time": "Took: 0.751s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "95C4DDBD66A6419C8E6C7EFE7334B056"
},
"cell_type": "code",
"source": "@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) => \n val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}} \n val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)}\n \n val newEventIds = events.keys // let's extract the ids of the incoming (grouped) events\n val similarityJoinMap = newEventIds.cartesian(currentMappings)\n .collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}\n .collectAsMap\n //val similarityBC = sparkContext.broadcast(similarityJoinMap) \n val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId())))\n newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids\n \n val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) => \n events.map(event => (event.id,event.payload, globalKey))\n }\n val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}\n currentState = newStates \n states.unpersist(false) \n states = newStates.union(states)\n states.cache() \n newTaggedEvents\n }",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "taggedEvents: org.apache.spark.streaming.dstream.DStream[(Int, String, String)] = org.apache.spark.streaming.dstream.TransformedDStream@6a596725\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 12,
"time": "Took: 0.971s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "EE22776EA26647BFA2DF78B80B444A10"
},
"cell_type": "code",
"source": "@transient val rawEventBox = ul(20)\nrawEventBox",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "rawEventBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres15: notebook.front.widgets.HtmlList = <HtmlList widget>\n"
},
{
"metadata": {},
"data": {
"text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{&quot;valueId&quot;:&quot;anon66a44b169710c8728ef4ba67ac6a09c7&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
},
"output_type": "execute_result",
"execution_count": 14,
"time": "Took: 0.863s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "3381C07E0EAE430E90362A9733923446"
},
"cell_type": "code",
"source": "eventStream.foreachRDD(e => rawEventBox.append(e.collect.mkString(\", \")))",
"outputs": [
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 15,
"time": "Took: 0.785s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "FE764B4A05EC481C8EA40A88ED6D473B"
},
"cell_type": "code",
"source": "@transient val currentTrans = ul(20)\ncurrentTrans",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "currentTrans: notebook.front.widgets.HtmlList = <HtmlList widget>\nres19: notebook.front.widgets.HtmlList = <HtmlList widget>\n"
},
{
"metadata": {},
"data": {
"text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{&quot;valueId&quot;:&quot;anonfc521a3a25c9a0a5958acd946894c1a9&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
},
"output_type": "execute_result",
"execution_count": 16,
"time": "Took: 0.725s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "80A8C316BEBB440E899157A51627AE1F"
},
"cell_type": "code",
"source": "@transient val eventBox = ul(20)\neventBox",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "eventBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres21: notebook.front.widgets.HtmlList = <HtmlList widget>\n"
},
{
"metadata": {},
"data": {
"text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{&quot;valueId&quot;:&quot;anona38d37904c4f09c626ff390359d2c8af&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
},
"output_type": "execute_result",
"execution_count": 17,
"time": "Took: 0.650s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "6056967780B14BFC8BE5EBC3635362E5"
},
"cell_type": "code",
"source": "@transient val transitionChainBox = ul(20)\ntransitionChainBox",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "transitionChainBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres23: notebook.front.widgets.HtmlList = <HtmlList widget>\n"
},
{
"metadata": {},
"data": {
"text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{&quot;valueId&quot;:&quot;anon1022644358033f118f633706870a441a&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>"
},
"output_type": "execute_result",
"execution_count": 18,
"time": "Took: 0.667s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "48D1F71E85F04A63B0F9026622D7E78E"
},
"cell_type": "code",
"source": "taggedEvents.foreachRDD{events => \n eventBox.append(\"---\")\n eventBox.append(events.collect.map{case (id, payload, globalKey) => s\"$id|$payload: $globalKey\"}.mkString(\",\"))\n val transitions = states.groupByKey.mapValues(eventSeq => eventSeq.toList.sortBy{case (id, ts) => ts}.map{case (id, ts) => id}.mkString (\"<-\"))\n transitionChainBox.append(\"---\")\n transitions.collect.map{case (globalId, eventSeq) => s\"$globalId: $eventSeq\"}.foreach(s => transitionChainBox.append(s))\n \n currentTrans.append(currentState.collect.map(_.toString).mkString(\",\"))\n \n }",
"outputs": [
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 19,
"time": "Took: 0.924s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "C19D7885F7B2437EB9B89D69462D35F9"
},
"cell_type": "code",
"source": "ssc.start()",
"outputs": [
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 20,
"time": "Took: 0.627s, at 2017-07-27 16:16"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "054D3411347247539EBC9AB16488F987"
},
"cell_type": "code",
"source": "ssc.stop(false)",
"outputs": [
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 21,
"time": "Took: 0.766s, at 2017-07-27 16:17"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": true,
"id": "0342E90E4BE949748B7D837F71336974"
},
"cell_type": "code",
"source": "",
"outputs": []
}
],
"nbformat": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment