Last active
March 14, 2016 02:00
-
-
Save DavidRdgz/64d5d470d5965ee36830 to your computer and use it in GitHub Desktop.
Mining Alice in Wonderland using Spark's GraphX. As if Alice in Wonderland were chat messages between Alice, Rabbit, Magpie, and Hatter.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.dvidr | |
import org.apache.spark.graphx.{VertexRDD, Edge, Graph} | |
import org.apache.spark.sql._ | |
import org.apache.spark.{SparkConf, SparkContext} | |
import scala.util.Random | |
case class Chat(id: Int, name: String, talk: String) | |
case class ChatGraph(id: Int, dst: Int, replyIn: Int, name: String, talk: String) | |
case class TopChat(id: Int, name: String, talk: String, inDeg: Int, outDeg: Int) | |
object AliceInGraphXLand { | |
def main(args: Array[String]) { | |
/* | |
Here we are setting up things so that the Alice text will appear: | |
alice -> hatter : ALICE'S ADVENTURES IN WONDERLAND | |
rabbit -> magpie : | |
hatter -> alice : Lewis Carroll | |
hadder -> alice : | |
magpie -> rabbit : THE MILLENNIUM FULCRUM EDITION 3.0 | |
so the Alice text now appers like a chat transcript with x -> y representing | |
x responding to y. | |
Arbitrarily "alice", "rabbit", "magpie", "hatter" are the interlocutors. | |
*/ | |
val conf = new SparkConf().setAppName("AliceInSparkLand").setMaster("local") | |
val sc = new SparkContext(conf) | |
val sqlContext = new SQLContext(sc) | |
import sqlContext.implicits._ | |
val chatters = List("alice", "rabbit", "magpie", "hatter") | |
/* | |
Get "Alice in Wonderland" from Gutenberg. A little setup is required | |
to avoid an edge case. Here we make life easy with the ChatGraph class | |
holding the information needed to construct a graph. The important properties | |
* id: unique id of chat | |
* dst: id of chat, this chat is a response to | |
* replyIn: the time diff between (dst - id): i.e. how long it took for this chatter to respond | |
* name: author of chat | |
* talk: the chat text | |
*/ | |
val graphAlice = sc.textFile("data/alice.txt").filter(_.nonEmpty).zipWithIndex.map { case (talk, idx) => | |
var refer = 0 | |
if (idx.toInt > 0) { | |
refer = Random.nextInt(idx.toInt) | |
} | |
ChatGraph(idx.toInt, refer, Random.nextInt(100), chatters(Random.nextInt(4)), talk.toLowerCase()) | |
}.toDF | |
/* | |
Create a Spark RDD table with columns made of field from ChatGraph | |
*/ | |
graphAlice.registerTempTable("graphAlice") | |
/* | |
Get back the data with Spark SQL | |
*/ | |
val df = sqlContext.sql("SELECT id, dst, replyIn, name, talk FROM graphAlice") | |
df.show(100) | |
/* | |
Create vertices with a unique id and properties held in the Chat class | |
*/ | |
val vertices = df.map(_.getValuesMap(List("id", "name", "talk"))).map { m => | |
(m("id").toString.toInt.toLong, Chat(m("id").toString.toInt, m("name").toString, m("talk").toString)) | |
} | |
/* | |
Create edge pairs from one interlocutor to another (id) -> (dst) with edge weight | |
of (replyIn) representing the time between chats | |
*/ | |
val edgesRDD = df.map(_.getValuesMap(List("id", "dst", "replyIn"))).map { m => | |
Edge(m("id").toString.toInt.toLong, m("dst").toString.toInt.toLong, m("replyIn").toString.toInt) | |
} | |
/* | |
Create Spark GraphX graph (phew) | |
*/ | |
val graph = Graph(vertices, edgesRDD) | |
/* | |
Query 1: Get the names of those speakers who used more than 15 words in their (talk) | |
*/ | |
graph.vertices.filter { case (id, chat) => chat.talk.split("\\s+").size > 15 }.collect.foreach { | |
case (id, chat) => println(s"${chat.name} spoke alot at $id") | |
} | |
/* | |
Query 2: Get the names of those speakers who responded to a message within 2 (seconds?) | |
*/ | |
graph.triplets.filter { x => x.attr < 2 }.collect.foreach { x => | |
println(s"${x.srcAttr.name} replied quickly to ${x.dstAttr.name}") | |
} | |
/* | |
New Graph: Create a new graph with vertex properties of TopChat class instead of ChatGraph. | |
What's new is TopChat records the (in) and (out) degrees of a vertex. | |
*/ | |
val initGraph: Graph[TopChat, Int] = graph.mapVertices((id, c) => TopChat(id.toInt, c.name, c.talk, 0, 0)) | |
val topChatGraph = initGraph.outerJoinVertices(initGraph.outDegrees) { case (id, c, deg) => | |
TopChat(id.toInt, c.name, c.talk, c.inDeg, deg.getOrElse(0)) | |
}.outerJoinVertices(initGraph.inDegrees) { case (id, c, deg) => | |
TopChat(id.toInt, c.name, c.talk, deg.getOrElse(0), c.outDeg) | |
} | |
/* | |
Query 3: Get the names of those speakers who had more than 5 responses to a chat | |
*/ | |
topChatGraph.vertices.filter { case (id, c) => c.inDeg > 5 }.collect.foreach { | |
case (id, c) => println(s"${c.name} had ${c.inDeg} responses") | |
} | |
/* | |
MapReduce 1: Find the average length of words in response to each post. | |
*/ | |
val aggResponse: VertexRDD[Double] = topChatGraph.aggregateMessages[(Int, Double)]( | |
triplet => triplet.sendToDst(1, triplet.srcAttr.talk.split("\\s+").size.toDouble), | |
(a, b) => (a._1 + b._1, a._2 + b._2) | |
).mapValues((id, nums) => nums._2 / nums._1) | |
/* | |
Print MapReduce 1 | |
*/ | |
topChatGraph.vertices.leftJoin(aggResponse) { (id, c, o) => | |
o match { | |
case Some(avg) if avg > 13 => s"${c.name} has a large average response $avg" | |
case _ => s"" | |
} | |
}.collect.filter(!_._2.isEmpty).foreach(println) | |
/* | |
Subgraph 1: Get the triplets (vertex1 -> edge -> vertex2) with edge weight less than 2 | |
then find the connected components at this "layer" of the graph. | |
*/ | |
val quickGraph = topChatGraph.subgraph(epred = x => x.attr < 2) | |
val cc = quickGraph.connectedComponents.vertices | |
/* | |
Print the names and components of the posts in the above subgraph | |
*/ | |
quickGraph.vertices.leftJoin(cc) { | |
case (id, c, comp) => s"${c.name} is in component ${comp.get}" | |
}.collect.foreach{ case (id, rez) => println(rez)} | |
/* | |
MapReduce 2: Find the Jaccard Similarity of the responses to each post | |
*/ | |
val jacardResponse: VertexRDD[Double] = topChatGraph.aggregateMessages[(Set[String], Set[String])] ( | |
triplet => { | |
val vocab = triplet.srcAttr.talk.split("\\s+").toSet | |
triplet.sendToDst(vocab, vocab)}, | |
(a, b) => (a._1.intersect(b._1), a._2.union(b._2)) | |
).mapValues((id, s) => s._1.size.toFloat / s._2.size) | |
/* | |
Print MapReduce 2 | |
*/ | |
topChatGraph.vertices.leftJoin(jacardResponse) { (id, c, o) => | |
o match { | |
case Some(sim) if sim > 0 && sim < 1 => s"${c.name} has replies that are $sim similar" | |
case _ => s"" | |
} | |
}.collect.filter(!_._2.isEmpty).foreach{case (id, rez) => println(rez)} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment