Skip to content

Instantly share code, notes, and snippets.

@DavidRdgz
Last active March 14, 2016 02:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DavidRdgz/64d5d470d5965ee36830 to your computer and use it in GitHub Desktop.
Save DavidRdgz/64d5d470d5965ee36830 to your computer and use it in GitHub Desktop.
Mining Alice in Wonderland using Spark's GraphX. As if Alice in Wonderland were chat messages between Alice, Rabbit, Magpie, and Hatter.
package com.dvidr
import org.apache.spark.graphx.{VertexRDD, Edge, Graph}
import org.apache.spark.sql._
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random
case class Chat(id: Int, name: String, talk: String)
case class ChatGraph(id: Int, dst: Int, replyIn: Int, name: String, talk: String)
case class TopChat(id: Int, name: String, talk: String, inDeg: Int, outDeg: Int)
object AliceInGraphXLand {
def main(args: Array[String]) {
/*
Here we are setting up things so that the Alice text will appear:
alice -> hatter : ALICE'S ADVENTURES IN WONDERLAND
rabbit -> magpie :
hatter -> alice : Lewis Carroll
hadder -> alice :
magpie -> rabbit : THE MILLENNIUM FULCRUM EDITION 3.0
so the Alice text now appers like a chat transcript with x -> y representing
x responding to y.
Arbitrarily "alice", "rabbit", "magpie", "hatter" are the interlocutors.
*/
val conf = new SparkConf().setAppName("AliceInSparkLand").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val chatters = List("alice", "rabbit", "magpie", "hatter")
/*
Get "Alice in Wonderland" from Gutenberg. A little setup is required
to avoid an edge case. Here we make life easy with the ChatGraph class
holding the information needed to construct a graph. The important properties
* id: unique id of chat
* dst: id of chat, this chat is a response to
* replyIn: the time diff between (dst - id): i.e. how long it took for this chatter to respond
* name: author of chat
* talk: the chat text
*/
val graphAlice = sc.textFile("data/alice.txt").filter(_.nonEmpty).zipWithIndex.map { case (talk, idx) =>
var refer = 0
if (idx.toInt > 0) {
refer = Random.nextInt(idx.toInt)
}
ChatGraph(idx.toInt, refer, Random.nextInt(100), chatters(Random.nextInt(4)), talk.toLowerCase())
}.toDF
/*
Create a Spark RDD table with columns made of field from ChatGraph
*/
graphAlice.registerTempTable("graphAlice")
/*
Get back the data with Spark SQL
*/
val df = sqlContext.sql("SELECT id, dst, replyIn, name, talk FROM graphAlice")
df.show(100)
/*
Create vertices with a unique id and properties held in the Chat class
*/
val vertices = df.map(_.getValuesMap(List("id", "name", "talk"))).map { m =>
(m("id").toString.toInt.toLong, Chat(m("id").toString.toInt, m("name").toString, m("talk").toString))
}
/*
Create edge pairs from one interlocutor to another (id) -> (dst) with edge weight
of (replyIn) representing the time between chats
*/
val edgesRDD = df.map(_.getValuesMap(List("id", "dst", "replyIn"))).map { m =>
Edge(m("id").toString.toInt.toLong, m("dst").toString.toInt.toLong, m("replyIn").toString.toInt)
}
/*
Create Spark GraphX graph (phew)
*/
val graph = Graph(vertices, edgesRDD)
/*
Query 1: Get the names of those speakers who used more than 15 words in their (talk)
*/
graph.vertices.filter { case (id, chat) => chat.talk.split("\\s+").size > 15 }.collect.foreach {
case (id, chat) => println(s"${chat.name} spoke alot at $id")
}
/*
Query 2: Get the names of those speakers who responded to a message within 2 (seconds?)
*/
graph.triplets.filter { x => x.attr < 2 }.collect.foreach { x =>
println(s"${x.srcAttr.name} replied quickly to ${x.dstAttr.name}")
}
/*
New Graph: Create a new graph with vertex properties of TopChat class instead of ChatGraph.
What's new is TopChat records the (in) and (out) degrees of a vertex.
*/
val initGraph: Graph[TopChat, Int] = graph.mapVertices((id, c) => TopChat(id.toInt, c.name, c.talk, 0, 0))
val topChatGraph = initGraph.outerJoinVertices(initGraph.outDegrees) { case (id, c, deg) =>
TopChat(id.toInt, c.name, c.talk, c.inDeg, deg.getOrElse(0))
}.outerJoinVertices(initGraph.inDegrees) { case (id, c, deg) =>
TopChat(id.toInt, c.name, c.talk, deg.getOrElse(0), c.outDeg)
}
/*
Query 3: Get the names of those speakers who had more than 5 responses to a chat
*/
topChatGraph.vertices.filter { case (id, c) => c.inDeg > 5 }.collect.foreach {
case (id, c) => println(s"${c.name} had ${c.inDeg} responses")
}
/*
MapReduce 1: Find the average length of words in response to each post.
*/
val aggResponse: VertexRDD[Double] = topChatGraph.aggregateMessages[(Int, Double)](
triplet => triplet.sendToDst(1, triplet.srcAttr.talk.split("\\s+").size.toDouble),
(a, b) => (a._1 + b._1, a._2 + b._2)
).mapValues((id, nums) => nums._2 / nums._1)
/*
Print MapReduce 1
*/
topChatGraph.vertices.leftJoin(aggResponse) { (id, c, o) =>
o match {
case Some(avg) if avg > 13 => s"${c.name} has a large average response $avg"
case _ => s""
}
}.collect.filter(!_._2.isEmpty).foreach(println)
/*
Subgraph 1: Get the triplets (vertex1 -> edge -> vertex2) with edge weight less than 2
then find the connected components at this "layer" of the graph.
*/
val quickGraph = topChatGraph.subgraph(epred = x => x.attr < 2)
val cc = quickGraph.connectedComponents.vertices
/*
Print the names and components of the posts in the above subgraph
*/
quickGraph.vertices.leftJoin(cc) {
case (id, c, comp) => s"${c.name} is in component ${comp.get}"
}.collect.foreach{ case (id, rez) => println(rez)}
/*
MapReduce 2: Find the Jaccard Similarity of the responses to each post
*/
val jacardResponse: VertexRDD[Double] = topChatGraph.aggregateMessages[(Set[String], Set[String])] (
triplet => {
val vocab = triplet.srcAttr.talk.split("\\s+").toSet
triplet.sendToDst(vocab, vocab)},
(a, b) => (a._1.intersect(b._1), a._2.union(b._2))
).mapValues((id, s) => s._1.size.toFloat / s._2.size)
/*
Print MapReduce 2
*/
topChatGraph.vertices.leftJoin(jacardResponse) { (id, c, o) =>
o match {
case Some(sim) if sim > 0 && sim < 1 => s"${c.name} has replies that are $sim similar"
case _ => s""
}
}.collect.filter(!_._2.isEmpty).foreach{case (id, rez) => println(rez)}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment