Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A REPL Example of using Aggregators in scala
/**
* To get started:
* git clone https://github.com/twitter/algebird
* cd algebird
* ./sbt algebird-core/console
*/
/**
* Let's get some data. Here is Alice in Wonderland, line by line
*/
val alice = io.Source.fromURL("http://www.gutenberg.org/files/11/11.txt").getLines.toStream
// if you have a local file:
// val alice = io.Source.fromFile("alice.txt").getLines.toStream
// flatMap on whitespace splits gives us a poor-folk's tokenizer (not suitable for
// real work)
val aliceWords = alice.flatMap(_.toLowerCase.split("\\s+"))
// how often does the word alice appear?
// aCount is an Aggregator, that encompases a special kind of
// computation, which happens to look a lot like Hadoop
val aCount = Aggregator.count { s: String => s == "alice" }
aCount(aliceWords)
// How long are the words in Alice in Wonderland?
// Moments.numericAggregator is an Aggregator that gives us the 0th to 4th moments.
// composePrepare is a method on all aggregators to transform the data BEFORE you
// prepare it. The name compose is used by scala.Functions in the same way. We are
// calling compose on prepare.
// Note, this returns a new Aggregator. This is one way Aggregators combine with Functions
val stringLengthMoments = Moments.numericAggregator[Int].composePrepare { s: String => s.length }
val moments = stringLengthMoments(aliceWords)
(moments.count, moments.mean, moments.stddev, moments.skewness, moments.kurtosis)
// let's make that prettier to return the standard moments directly, not the raw moments:
// andThenPresent is a method on all aggregators to apply a function at the end.
// Note, this returns a new Aggregator. This is another way Aggregators combine with Functions
val niceMoments = stringLengthMoments.andThenPresent { moments =>
(moments.count, moments.mean, moments.stddev, moments.skewness, moments.kurtosis) }
niceMoments(aliceWords)
// much better
// Exact unique count all words
// builds an in-memory set of all the words, then gets the size
Aggregator.uniqueCount(aliceWords)
// Let's get both the unique count and the moments:
// Join builds a new aggregator that only passes once
// we can join many times to create an Aggregator that
// tells us everything we want to know with a single pass.
// This is the main way Aggregators combine with Aggregators
val myAgg = Aggregator.uniqueCount.join(niceMoments)
myAgg(aliceWords)
// There is even a trivial Aggregator that returns a constant.
Aggregator.const("Hello World")(aliceWords)
// const, join, andThenPresent, means Aggregator[T]
// satisfies the laws to be called an Applicative Functor
// There are many functions you can write that generalize
// over any thing that has those functions with the right
// relationships between them.
// BigData is still carrying some social currency,
// so let's count each word. Do this by making each word into
// a spare vector Map(word -> 1), and then using the standard
// monoid on Maps to sum them
val wordCount = Aggregator.prepareMonoid { s: String => Map(s -> 1) }
wordCount(aliceWords).take(10)
// The above just showed us a few random words.
// What words are most frequent?
val topKfromCounts = Aggregator.sortedReverseTake[(Int, String)](20)
// We run two aggregators: one to get the counts, then
// the second to get the top values
// Note, here our pure abstraction breaks down: Two Aggregators in series
// is not an Aggregator. It can be a function though:
val topK: TraversableOnce[String] => Seq[(Int, String)] = { in =>
val wordsToCountIter: Aggregator[String, _, Iterator[(Int, String)]] =
wordCount.andThenPresent(m => m.iterator.map(_.swap))
// now apply both
topKfromCounts(wordsToCountIter(in))
}
topK(aliceWords)
// But that required us to store all counts in memory and
// to do two aggregators in series, breaking our nice abstraction.
//
// What about an approximation technique to keep only some?
//
// Count-min sketch is an algorithm that can give us
// approximate counts and most frequent items with constant
// memory and in one pass. It's awesome.
//
// Unfortunately, for strings, we need to set up a hasher,
// but this is easy to do (and will be added to algebird soon)
implicit val hash: CMSHasher[String] = new CMSHasher[String] {
def hash(a: Int, b: Int, width: Int)(x: String) = {
// a, b are the indices of the hash. We use them as the seed
val h = MurmurHash128(a.toLong << 32 | b.toLong)(x.getBytes)._1
// Make sure it is positive and within the width
(h & Int.MaxValue).toInt % width
}
}
// Now we can create an aggregator to give us the top words,
// using constant memory (Big Data, Here we come!)
//
// eps = error for each item in terms of total count (absError ~ totalCount * eps)
// delta = probability that the error is actually higher than that for a given item
// seed = used to build the hash functions
// heavyHittersPct is the percent of the total count an item needs to be a heavy hitter
val approxTopK: Aggregator[String, _, List[(String, Long)]] =
TopPctCMS.aggregator[String](eps = 0.01, delta = 0.01, seed = 314, heavyHittersPct = 0.003)
.andThenPresent { cms =>
// Get out the heavy hitters. Note CMS can tell you MUCH MUCH more!
cms.heavyHitters
.map { k => k -> cms.frequency(k).estimate }
.toList
.sortBy { case (_, count) => -count }
}
approxTopK(aliceWords)
// Similarly, what if we want to count the number of distinct words
// without storing the set of all words in memory? HyperLogLog to the rescue
//
// size=5 means 2^5 bits (4 bytes) used for the HLL Array[Byte]
// HLL deals with Array[Byte] inputs, so we need to composePrepare a way to get to bytes.
// a simple but sometimes slow way is just go toString.getBytes.
val approxUnique = HyperLogLogAggregator.sizeAggregator(5).composePrepare { s: String => s.getBytes }
approxUnique(aliceWords)
// Not bad, but that was actually with 1/sqrt(2^5) = 17% std dev from true value.
// To get about 1% we need 2^13 bits = 1 kB
val approxUnique1 = HyperLogLogAggregator.sizeAggregator(13).composePrepare { s: String => s.getBytes }
approxUnique1(aliceWords)
// That gives about 1.8% error in this case.
// Crank the size up more if you need more accuracy
/**
* Scalding has built in functions to .aggregate hadoop streams.
*
* for Spark see:
* https://github.com/twitter/algebird/pull/397
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.