Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
A REPL Example of using Aggregators in scala
* To get started:
* git clone
* cd algebird
* ./sbt algebird-core/console
* Let's get some data. Here is Alice in Wonderland, line by line
val alice = io.Source.fromURL("").getLines.toStream
// if you have a local file:
// val alice = io.Source.fromFile("alice.txt").getLines.toStream
// flatMap on whitespace splits gives us a poor-folk's tokenizer (not suitable for
// real work)
val aliceWords = alice.flatMap(_.toLowerCase.split("\\s+"))
// how often does the word alice appear?
// aCount is an Aggregator, that encompases a special kind of
// computation, which happens to look a lot like Hadoop
val aCount = Aggregator.count { s: String => s == "alice" }
// How long are the words in Alice in Wonderland?
// Moments.numericAggregator is an Aggregator that gives us the 0th to 4th moments.
// composePrepare is a method on all aggregators to transform the data BEFORE you
// prepare it. The name compose is used by scala.Functions in the same way. We are
// calling compose on prepare.
// Note, this returns a new Aggregator. This is one way Aggregators combine with Functions
val stringLengthMoments = Moments.numericAggregator[Int].composePrepare { s: String => s.length }
val moments = stringLengthMoments(aliceWords)
(moments.count, moments.mean, moments.stddev, moments.skewness, moments.kurtosis)
// let's make that prettier to return the standard moments directly, not the raw moments:
// andThenPresent is a method on all aggregators to apply a function at the end.
// Note, this returns a new Aggregator. This is another way Aggregators combine with Functions
val niceMoments = stringLengthMoments.andThenPresent { moments =>
(moments.count, moments.mean, moments.stddev, moments.skewness, moments.kurtosis) }
// much better
// Exact unique count all words
// builds an in-memory set of all the words, then gets the size
// Let's get both the unique count and the moments:
// Join builds a new aggregator that only passes once
// we can join many times to create an Aggregator that
// tells us everything we want to know with a single pass.
// This is the main way Aggregators combine with Aggregators
val myAgg = Aggregator.uniqueCount.join(niceMoments)
// There is even a trivial Aggregator that returns a constant.
Aggregator.const("Hello World")(aliceWords)
// const, join, andThenPresent, means Aggregator[T]
// satisfies the laws to be called an Applicative Functor
// There are many functions you can write that generalize
// over any thing that has those functions with the right
// relationships between them.
// BigData is still carrying some social currency,
// so let's count each word. Do this by making each word into
// a spare vector Map(word -> 1), and then using the standard
// monoid on Maps to sum them
val wordCount = Aggregator.prepareMonoid { s: String => Map(s -> 1) }
// The above just showed us a few random words.
// What words are most frequent?
val topKfromCounts = Aggregator.sortedReverseTake[(Int, String)](20)
// We run two aggregators: one to get the counts, then
// the second to get the top values
// Note, here our pure abstraction breaks down: Two Aggregators in series
// is not an Aggregator. It can be a function though:
val topK: TraversableOnce[String] => Seq[(Int, String)] = { in =>
val wordsToCountIter: Aggregator[String, _, Iterator[(Int, String)]] =
wordCount.andThenPresent(m =>
// now apply both
// But that required us to store all counts in memory and
// to do two aggregators in series, breaking our nice abstraction.
// What about an approximation technique to keep only some?
// Count-min sketch is an algorithm that can give us
// approximate counts and most frequent items with constant
// memory and in one pass. It's awesome.
// Unfortunately, for strings, we need to set up a hasher,
// but this is easy to do (and will be added to algebird soon)
implicit val hash: CMSHasher[String] = new CMSHasher[String] {
def hash(a: Int, b: Int, width: Int)(x: String) = {
// a, b are the indices of the hash. We use them as the seed
val h = MurmurHash128(a.toLong << 32 | b.toLong)(x.getBytes)._1
// Make sure it is positive and within the width
(h & Int.MaxValue).toInt % width
// Now we can create an aggregator to give us the top words,
// using constant memory (Big Data, Here we come!)
// eps = error for each item in terms of total count (absError ~ totalCount * eps)
// delta = probability that the error is actually higher than that for a given item
// seed = used to build the hash functions
// heavyHittersPct is the percent of the total count an item needs to be a heavy hitter
val approxTopK: Aggregator[String, _, List[(String, Long)]] =
TopPctCMS.aggregator[String](eps = 0.01, delta = 0.01, seed = 314, heavyHittersPct = 0.003)
.andThenPresent { cms =>
// Get out the heavy hitters. Note CMS can tell you MUCH MUCH more!
.map { k => k -> cms.frequency(k).estimate }
.sortBy { case (_, count) => -count }
// Similarly, what if we want to count the number of distinct words
// without storing the set of all words in memory? HyperLogLog to the rescue
// size=5 means 2^5 bits (4 bytes) used for the HLL Array[Byte]
// HLL deals with Array[Byte] inputs, so we need to composePrepare a way to get to bytes.
// a simple but sometimes slow way is just go toString.getBytes.
val approxUnique = HyperLogLogAggregator.sizeAggregator(5).composePrepare { s: String => s.getBytes }
// Not bad, but that was actually with 1/sqrt(2^5) = 17% std dev from true value.
// To get about 1% we need 2^13 bits = 1 kB
val approxUnique1 = HyperLogLogAggregator.sizeAggregator(13).composePrepare { s: String => s.getBytes }
// That gives about 1.8% error in this case.
// Crank the size up more if you need more accuracy
* Scalding has built in functions to .aggregate hadoop streams.
* for Spark see:
Copy link

deanwampler commented Jan 10, 2015

You can write this, although it probably isn't useful for the andThenPresent case that follows:
val Moments(count, mean, stddev, skewness, kurtosis) = stringLengthMoments(aliceWords)

Copy link

danosipov commented Jan 12, 2015

I get an error on line 120 - removing the explicit type works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment