"cell_type": "code",
"source": "import org.apache.spark.streaming._\nimport org.apache.spark.streaming.dstream.QueueInputDStream\n",
"cell_type": "code",
"source": "val data = Seq( \"1|a,1|b,3|c\", \"2|d,2|e,2|f\", \"3|g,3|h,3|i,4|j\", \"5|k\", \"4|f,1|g\", \"6|h\")\nval inputRDDs = => str.split(\",\")).map(arr => sparkContext.parallelize(arr))\n/** expected output\nZZ => 1a, 1b, 1c, 2d, 2e, 2f, 3g, 3h, 4f\nXX => 3c, 4j, 5k, 6h\nYY => 1g\n**/\n",
"cell_type": "code",
"source": "case class Event(id: Int, payload: String)",
"cell_type": "code",
"source": "@transient val ssc = new StreamingContext(sparkContext, Seconds(5))",
"cell_type": "code",
"source": "val queue = scala.collection.mutable.Queue(inputRDDs:_*)",
"cell_type": "code",
"source": "// This is for test purposes only. Replace with actual stream source.\n@transient val queueDStream = ssc.queueStream(queue, oneAtATime = true)",
"cell_type": "code",
"source": "// this is our similarity function. Replace with something appropriate.\n// We're using the function notation instead of a def, b/c it's cleaner for the serialization.\nval isSimilar: Int => Int => Boolean = event1 => event2 => Math.abs(event2 - event1).toInt == 1 \n\n// Global Id Generator. Should generate unique ids each time - replace accordingly\nval genGlobalId: () => String = () => \"gen-\" + scala.util.Random.nextInt(10000)",
"cell_type": "code",
"source": "// Here we have our initial Event stream\n@transient val eventStream ={entry => \n val Array(id, payload) = entry.split(\"\\\\|\")\n Event(id.toInt, payload)\n}",
"cell_type": "code",
"source": "@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD",
"cell_type": "code",
"source": "@transient var currentState: RDD[(String, (Int, Long))] = sparkContext.emptyRDD",
"cell_type": "code",
"source": "@transient val eventsById = => (, event))\n@transient val groupedEvents = eventsById.groupByKey()",
"cell_type": "code",
"source": "@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) => \n val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}} \n val currentMappings ={case (globalId, (currentId, maxTx)) => (currentId, globalId)}\n \n val newEventIds = events.keys // let's extract the ids of the incoming (grouped) events\n val similarityJoinMap = newEventIds.cartesian(currentMappings)\n .collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}\n .collectAsMap\n //val similarityBC = sparkContext.broadcast(similarityJoinMap) \n val newGlobalKeys = => (id, similarityJoinMap.getOrElse(id, genGlobalId())))\n newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids\n \n val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) => \n => (,event.payload, globalKey))\n }\n val newStates ={case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}\n currentState = newStates \n states.unpersist(false) \n states = newStates.union(states)\n states.cache() \n newTaggedEvents\n }",
"cell_type": "code",
"source": "@transient val rawEventBox = ul(20)\nrawEventBox",
"cell_type": "code",
"source": "eventStream.foreachRDD(e => rawEventBox.append(e.collect.mkString(\", \")))",
"cell_type": "code",
"source": "@transient val currentTrans = ul(20)\ncurrentTrans",
"cell_type": "code",
"source": "@transient val eventBox = ul(20)\neventBox",
"cell_type": "code",
"source": "@transient val transitionChainBox = ul(20)\ntransitionChainBox",
"cell_type": "code",
"source": "taggedEvents.foreachRDD{events => \n eventBox.append(\"---\")\n eventBox.append({case (id, payload, globalKey) => s\"$id|$payload: $globalKey\"}.mkString(\",\"))\n val transitions = states.groupByKey.mapValues(eventSeq => eventSeq.toList.sortBy{case (id, ts) => ts}.map{case (id, ts) => id}.mkString (\"<-\"))\n transitionChainBox.append(\"---\")\n{case (globalId, eventSeq) => s\"$globalId: $eventSeq\"}.foreach(s => transitionChainBox.append(s))\n \n currentTrans.append(\",\"))\n \n }",
"cell_type": "code",
"source": "ssc.start()",
"cell_type": "code",
"source": "ssc.stop(false)",
