Forked from johnynek/AliceInAggregatorLand.scala
Last active
August 29, 2015 14:14
-
-
Save manboubird/a231e2c9dfb8e6a4642e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* To get started: | |
* git clone https://github.com/twitter/algebird | |
* cd algebird | |
* ./sbt algebird-core/console | |
*/ | |
/** | |
* Let's get some data. Here is Alice in Wonderland, line by line | |
*/ | |
val alice = io.Source.fromURL("http://www.gutenberg.org/files/11/11.txt").getLines.toStream | |
// if you have a local file: | |
// val alice = io.Source.fromFile("alice.txt").getLines.toStream | |
// flatMap on whitespace splits gives us a poor-folk's tokenizer (not suitable for | |
// real work) | |
val aliceWords = alice.flatMap(_.toLowerCase.split("\\s+")) | |
// how often does the word alice appear? | |
// aCount is an Aggregator, that encompases a special kind of | |
// computation, which happens to look a lot like Hadoop | |
val aCount = Aggregator.count { s: String => s == "alice" } | |
aCount(aliceWords) | |
// How long are the words in Alice in Wonderland? | |
// Moments.numericAggregator is an Aggregator that gives us the 0th to 4th moments. | |
// composePrepare is a method on all aggregators to transform the data BEFORE you | |
// prepare it. The name compose is used by scala.Functions in the same way. We are | |
// calling compose on prepare. | |
// Note, this returns a new Aggregator. This is one way Aggregators combine with Functions | |
val stringLengthMoments = Moments.numericAggregator[Int].composePrepare { s: String => s.length } | |
val moments = stringLengthMoments(aliceWords) | |
(moments.count, moments.mean, moments.stddev, moments.skewness, moments.kurtosis) | |
// let's make that prettier to return the standard moments directly, not the raw moments: | |
// andThenPresent is a method on all aggregators to apply a function at the end. | |
// Note, this returns a new Aggregator. This is another way Aggregators combine with Functions | |
val niceMoments = stringLengthMoments.andThenPresent { moments => | |
(moments.count, moments.mean, moments.stddev, moments.skewness, moments.kurtosis) } | |
niceMoments(aliceWords) | |
// much better | |
// Exact unique count all words | |
// builds an in-memory set of all the words, then gets the size | |
Aggregator.uniqueCount(aliceWords) | |
// Let's get both the unique count and the moments: | |
// Join builds a new aggregator that only passes once | |
// we can join many times to create an Aggregator that | |
// tells us everything we want to know with a single pass. | |
// This is the main way Aggregators combine with Aggregators | |
val myAgg = Aggregator.uniqueCount.join(niceMoments) | |
myAgg(aliceWords) | |
// There is even a trivial Aggregator that returns a constant. | |
Aggregator.const("Hello World")(aliceWords) | |
// const, join, andThenPresent, means Aggregator[T] | |
// satisfies the laws to be called an Applicative Functor | |
// There are many functions you can write that generalize | |
// over any thing that has those functions with the right | |
// relationships between them. | |
// BigData is still carrying some social currency, | |
// so let's count each word. Do this by making each word into | |
// a spare vector Map(word -> 1), and then using the standard | |
// monoid on Maps to sum them | |
val wordCount = Aggregator.prepareMonoid { s: String => Map(s -> 1) } | |
wordCount(aliceWords).take(10) | |
// The above just showed us a few random words. | |
// What words are most frequent? | |
val topKfromCounts = Aggregator.sortedReverseTake[(Int, String)](20) | |
// We run two aggregators: one to get the counts, then | |
// the second to get the top values | |
// Note, here our pure abstraction breaks down: Two Aggregators in series | |
// is not an Aggregator. It can be a function though: | |
val topK: TraversableOnce[String] => Seq[(Int, String)] = { in => | |
val wordsToCountIter: Aggregator[String, _, Iterator[(Int, String)]] = | |
wordCount.andThenPresent(m => m.iterator.map(_.swap)) | |
// now apply both | |
topKfromCounts(wordsToCountIter(in)) | |
} | |
topK(aliceWords) | |
// But that required us to store all counts in memory and | |
// to do two aggregators in series, breaking our nice abstraction. | |
// | |
// What about an approximation technique to keep only some? | |
// | |
// Count-min sketch is an algorithm that can give us | |
// approximate counts and most frequent items with constant | |
// memory and in one pass. It's awesome. | |
// | |
// Unfortunately, for strings, we need to set up a hasher, | |
// but this is easy to do (and will be added to algebird soon) | |
implicit val hash: CMSHasher[String] = new CMSHasher[String] { | |
def hash(a: Int, b: Int, width: Int)(x: String) = { | |
// a, b are the indices of the hash. We use them as the seed | |
val h = MurmurHash128(a.toLong << 32 | b.toLong)(x.getBytes)._1 | |
// Make sure it is positive and within the width | |
(h & Int.MaxValue).toInt % width | |
} | |
} | |
// Now we can create an aggregator to give us the top words, | |
// using constant memory (Big Data, Here we come!) | |
// | |
// eps = error for each item in terms of total count (absError ~ totalCount * eps) | |
// delta = probability that the error is actually higher than that for a given item | |
// seed = used to build the hash functions | |
// heavyHittersPct is the percent of the total count an item needs to be a heavy hitter | |
val approxTopK: Aggregator[String, _, List[(String, Long)]] = | |
TopPctCMS.aggregator[String](eps = 0.01, delta = 0.01, seed = 314, heavyHittersPct = 0.003) | |
.andThenPresent { cms => | |
// Get out the heavy hitters. Note CMS can tell you MUCH MUCH more! | |
cms.heavyHitters | |
.map { k => k -> cms.frequency(k).estimate } | |
.toList | |
.sortBy { case (_, count) => -count } | |
} | |
approxTopK(aliceWords) | |
// Similarly, what if we want to count the number of distinct words | |
// without storing the set of all words in memory? HyperLogLog to the rescue | |
// | |
// size=5 means 2^5 bits (4 bytes) used for the HLL Array[Byte] | |
// HLL deals with Array[Byte] inputs, so we need to composePrepare a way to get to bytes. | |
// a simple but sometimes slow way is just go toString.getBytes. | |
val approxUnique = HyperLogLogAggregator.sizeAggregator(5).composePrepare { s: String => s.getBytes } | |
approxUnique(aliceWords) | |
// Not bad, but that was actually with 1/sqrt(2^5) = 17% std dev from true value. | |
// To get about 1% we need 2^13 bits = 1 kB | |
val approxUnique1 = HyperLogLogAggregator.sizeAggregator(13).composePrepare { s: String => s.getBytes } | |
approxUnique1(aliceWords) | |
// That gives about 1.8% error in this case. | |
// Crank the size up more if you need more accuracy | |
/** | |
* Scalding has built in functions to .aggregate hadoop streams. | |
* | |
* for Spark see: | |
* https://github.com/twitter/algebird/pull/397 | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment