Created
March 9, 2016 04:59
-
-
Save DavidRdgz/65faf81a2d5bbd83e017 to your computer and use it in GitHub Desktop.
An adaption of the awesome AliceInAggregatorLand: https://gist.github.com/johnynek/814fc1e77aad1d295bb7
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.dvidr | |
import com.twitter.algebird.{Moments, Aggregator} | |
import scala.util.Random | |
/* | |
Please refer to AliceInAggregatorLand first: | |
https://gist.github.com/johnynek/814fc1e77aad1d295bb7 | |
This is an adaption where "Alice In Wonderland" is turned | |
into a chat transcript. | |
*/ | |
case class Chat(name: String, talk: String) | |
object AliceInTwitterLand { | |
def main(args: Array[String]) { | |
/* | |
Here we are setting up things so that the Alice text will appear: | |
alice: ALICE'S ADVENTURES IN WONDERLAND | |
rabbit: | |
hadder: Lewis Carroll | |
hadder: | |
magpie: THE MILLENNIUM FULCRUM EDITION 3.0 | |
so the Alice text now appers like a chat transcript. The order is random. | |
Arbitrarily "alice", "rabbit", "magpie", "hatter" are the interlocutors. | |
*/ | |
val alice = io.Source.fromFile("data/alice.txt").getLines.toStream | |
val aliceWord = alice.map(_.toLowerCase) | |
val chatters = List("alice", "rabbit", "magpie", "hatter") | |
val chatOrder = Seq.fill(alice.size)(Random.nextInt(4)).map(chatters(_)).toStream | |
// Here's our transcript Stream(Chat('alice','blah'), Chat('rabbit', 'bleh'), ?) | |
val chatAlice = (chatOrder zip aliceWord).map{case (n, t) => Chat(n, t)} | |
// Get how many times the word alice appears. | |
val aCount = Aggregator.count {c: Chat => c.talk contains "alice"} | |
val a = aCount(chatAlice) | |
// Get how many times each interlocutor says alice. | |
// val d = chatAlice.groupBy(_.name).aggregate(aCount) // not working but looks like Scalding... | |
val d = chatAlice.groupBy(_.name).map(x => aCount(x._2)) | |
// Get how many words each interlocutor has said | |
val gCount = Aggregator.prepareMonoid{c: Chat => Map(c.name -> c.talk.split("\\s+").size)} | |
val b = gCount(chatAlice) | |
println(b) | |
// Make a helper function. | |
// Signature: (Chat) -> Int | |
// Purpose: Count the number of interlocutors referred to (excluding oneself) | |
def refer (c: Chat): Int = { chatters.map{x => if (x != c.name) x.r.findAllIn(c.talk).length else 0}.sum } | |
// Calculate the moments of the number of refers per chat. | |
val referChat = Moments.numericAggregator[Int].composePrepare{c: Chat => refer(c)} | |
val moments = referChat(chatAlice) | |
// Print pretty moments | |
val niceMoments = referChat.andThenPresent{ moments => | |
(moments.count, moments.mean, moments.stddev, moments.skewness, moments.kurtosis)} | |
niceMoments(chatAlice) | |
// Make a helper function 2. | |
// Signature: (Chat) -> List[String] | |
// Purpose: Return the actual interlocutor referred in chat. | |
def refer2 (c: Chat): List[String] = { chatters.flatMap{x => if (x != c.name) x.r.findAllIn(c.talk).toList else List("")} } | |
// Get transcript level number of mentions of an individual along with count. | |
val gAlice = Aggregator.prepareMonoid {c: Chat => refer2(c).map {x: String => (x -> 1)}.toMap} | |
val myAgg = gAlice.join(niceMoments) | |
println(myAgg(chatAlice)) | |
// Aggregators have trivial constants. Giving Aggregators Functor properties. | |
Aggregator.const("Alice is silent.")(chatAlice) | |
// Get the vocabulary and word counts of an interlocutor | |
val gSplit = Aggregator.prepareMonoid{c: Chat => c.talk.split("\\s+").map((_ -> 1)).toMap} | |
// Group by interlocutor and take first 2 words and their count. | |
val g = chatAlice.groupBy(_.name).map(x => (x._1 -> gSplit(x._2).take(2))) | |
println(g) | |
// We can compose Aggregators. | |
// To find top word usage, we reverse the above vocabulary map-word count. | |
val topKfromCounts = Aggregator.sortedReverseTake[(Int, String)](5) | |
val topK: TraversableOnce[Chat] => Seq[(Int, String)] = { in => | |
val wordsToCountIter: Aggregator[Chat, _, Iterator[(Int, String)]] = | |
gSplit.andThenPresent(m => m.iterator.map(_.swap)) | |
topKfromCounts(wordsToCountIter(in)) | |
} | |
// Get top words for each interlocutor | |
val gg = chatAlice.groupBy(_.name).map(x => x._1 -> topK(x._2)) | |
println(gg) | |
// Todo TopPctCMS | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment