Skip to content

Instantly share code, notes, and snippets.

@bigsnarfdude
Created September 28, 2014 07:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bigsnarfdude/ea883a19232833cf2647 to your computer and use it in GitHub Desktop.
Save bigsnarfdude/ea883a19232833cf2647 to your computer and use it in GitHub Desktop.
Scalding Algebird CMS
import com.twitter.scalding._
import com.twitter.algebird._
/**
* More sensible aggregation with Monoids.
* Use SketchMap to get only the top words that we are interested about.
* SketchMap is a generalization of the CountMinSketch in Algebird. Holds list of top items.
* The size of the CMS will not grow so this will not run out of mem.
*/
class WordCount5(args: Args) extends Job(args) {
implicit def utf8(s: String): Array[Byte] = com.twitter.bijection.Injection.utf8(s)
implicit val cmm = new SketchMapMonoid[String, Long](128, 6, 0, 20) // top 20
type ApproxMap = SketchMap[String, Long]
TextLine(args("input"))
.flatMap('line -> 'word) { tokenize }
.map('word -> 'word) { w: String => cmm.create((w, 1L)) }
.groupAll{ _.sum[ApproxMap]('word) }
.flatMap('word -> ('word, 'size)) { words: ApproxMap => words.heavyHitters }
.write(Tsv(args("output")))
def tokenize(text: String): Array[String] = {
text.toLowerCase.replaceAll("[^a-z0-9\\s]", "").split("\\s+").filter( _ != "")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment