Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A REPL Example of using Aggregators in scala
/**
* To get started:
* git clone https://github.com/twitter/algebird
* cd algebird
* ./sbt algebird-core/console
*/
/**
* Let's get some data. Here is Alice in Wonderland, line by line
*/
val alice = io.Source.fromURL("http://www.gutenberg.org/files/11/11.txt").getLines.toStream
// if you have a local file:
// val alice = io.Source.fromFile("alice.txt").getLines.toStream
// flatMap on whitespace splits gives us a poor-folk's tokenizer (not suitable for
// real work)
val aliceWords = alice.flatMap(_.toLowerCase.split("\\s+"))
// how often does the word alice appear?
// aCount is an Aggregator, that encompases a special kind of
// computation, which happens to look a lot like Hadoop
val aCount = Aggregator.count { s: String => s == "alice" }
aCount(aliceWords)
// How long are the words in Alice in Wonderland?
// Moments.numericAggregator is an Aggregator that gives us the 0th to 4th moments.
// composePrepare is a method on all aggregators to transform the data BEFORE you
// prepare it. The name compose is used by scala.Functions in the same way. We are
// calling compose on prepare.
// Note, this returns a new Aggregator. This is one way Aggregators combine with Functions
val stringLengthMoments = Moments.numericAggregator[Int].composePrepare { s: String => s.length }
val moments = stringLengthMoments(aliceWords)
(moments.count, moments.mean, moments.stddev, moments.skewness, moments.kurtosis)
// let's make that prettier to return the standard moments directly, not the raw moments:
// andThenPresent is a method on all aggregators to apply a function at the end.
// Note, this returns a new Aggregator. This is another way Aggregators combine with Functions
val niceMoments = stringLengthMoments.andThenPresent { moments =>
(moments.count, moments.mean, moments.stddev, moments.skewness, moments.kurtosis) }
niceMoments(aliceWords)
// much better
// Exact unique count all words
// builds an in-memory set of all the words, then gets the size
Aggregator.uniqueCount(aliceWords)
// Let's get both the unique count and the moments:
// Join builds a new aggregator that only passes once
// we can join many times to create an Aggregator that
// tells us everything we want to know with a single pass.
// This is the main way Aggregators combine with Aggregators
val myAgg = Aggregator.uniqueCount.join(niceMoments)
myAgg(aliceWords)
// There is even a trivial Aggregator that returns a constant.
Aggregator.const("Hello World")(aliceWords)
// const, join, andThenPresent, means Aggregator[T]
// satisfies the laws to be called an Applicative Functor
// There are many functions you can write that generalize
// over any thing that has those functions with the right
// relationships between them.
// BigData is still carrying some social currency,
// so let's count each word. Do this by making each word into
// a spare vector Map(word -> 1), and then using the standard
// monoid on Maps to sum them
val wordCount = Aggregator.prepareMonoid { s: String => Map(s -> 1) }
wordCount(aliceWords).take(10)
// The above just showed us a few random words.
// What words are most frequent?
val topKfromCounts = Aggregator.sortedReverseTake[(Int, String)](20)
// We run two aggregators: one to get the counts, then
// the second to get the top values
// Note, here our pure abstraction breaks down: Two Aggregators in series
// is not an Aggregator. It can be a function though:
val topK: TraversableOnce[String] => Seq[(Int, String)] = { in =>
val wordsToCountIter: Aggregator[String, _, Iterator[(Int, String)]] =
wordCount.andThenPresent(m => m.iterator.map(_.swap))
// now apply both
topKfromCounts(wordsToCountIter(in))
}
topK(aliceWords)
// But that required us to store all counts in memory and
// to do two aggregators in series, breaking our nice abstraction.
//
// What about an approximation technique to keep only some?
//
// Count-min sketch is an algorithm that can give us
// approximate counts and most frequent items with constant
// memory and in one pass. It's awesome.
//
// Unfortunately, for strings, we need to set up a hasher,
// but this is easy to do (and will be added to algebird soon)
implicit val hash: CMSHasher[String] = new CMSHasher[String] {
def hash(a: Int, b: Int, width: Int)(x: String) = {
// a, b are the indices of the hash. We use them as the seed
val h = MurmurHash128(a.toLong << 32 | b.toLong)(x.getBytes)._1
// Make sure it is positive and within the width
(h & Int.MaxValue).toInt % width
}
}
// Now we can create an aggregator to give us the top words,
// using constant memory (Big Data, Here we come!)
//
// eps = error for each item in terms of total count (absError ~ totalCount * eps)
// delta = probability that the error is actually higher than that for a given item
// seed = used to build the hash functions
// heavyHittersPct is the percent of the total count an item needs to be a heavy hitter
val approxTopK: Aggregator[String, _, List[(String, Long)]] =
TopPctCMS.aggregator[String](eps = 0.01, delta = 0.01, seed = 314, heavyHittersPct = 0.003)
.andThenPresent { cms =>
// Get out the heavy hitters. Note CMS can tell you MUCH MUCH more!
cms.heavyHitters
.map { k => k -> cms.frequency(k).estimate }
.toList
.sortBy { case (_, count) => -count }
}
approxTopK(aliceWords)
// Similarly, what if we want to count the number of distinct words
// without storing the set of all words in memory? HyperLogLog to the rescue
//
// size=5 means 2^5 bits (4 bytes) used for the HLL Array[Byte]
// HLL deals with Array[Byte] inputs, so we need to composePrepare a way to get to bytes.
// a simple but sometimes slow way is just go toString.getBytes.
val approxUnique = HyperLogLogAggregator.sizeAggregator(5).composePrepare { s: String => s.getBytes }
approxUnique(aliceWords)
// Not bad, but that was actually with 1/sqrt(2^5) = 17% std dev from true value.
// To get about 1% we need 2^13 bits = 1 kB
val approxUnique1 = HyperLogLogAggregator.sizeAggregator(13).composePrepare { s: String => s.getBytes }
approxUnique1(aliceWords)
// That gives about 1.8% error in this case.
// Crank the size up more if you need more accuracy
/**
* Scalding has built in functions to .aggregate hadoop streams.
*
* for Spark see:
* https://github.com/twitter/algebird/pull/397
*/
@deanwampler

This comment has been minimized.

Copy link

commented Jan 10, 2015

You can write this, although it probably isn't useful for the andThenPresent case that follows:
val Moments(count, mean, stddev, skewness, kurtosis) = stringLengthMoments(aliceWords)

@danosipov

This comment has been minimized.

Copy link

commented Jan 12, 2015

I get an error on line 120 - removing the explicit type works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.