Skip to content

Instantly share code, notes, and snippets.

@johnynek
Created January 6, 2014 21:47
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save johnynek/8290375 to your computer and use it in GitHub Desktop.
Save johnynek/8290375 to your computer and use it in GitHub Desktop.
example of LAG type function in the scalding Fields API (similar for typed)
groupBy('source) {
_.sortBy('links)
.reverse
.mapStream[(String,Int), (String, Int, Int, Int)]
(('destination, 'links) -> ('destination, 'links, 'rank, 'gap)) { destLinks =>
destLinks.scanLeft(None: Option[(String, Int, Int, Int)]) {
(prevRowOut: Option[(String,Int,Int,Int)], thisRow: (String, Int)) =>
val (dest, links) = thisRow
prevRowOut match {
case None => Some((dest, links, 1, 0)) // rank 1, gap 0 -- not exactly what you wanted...
case Some((prevDest, prevLinks, prevRank, prevGap)) =>
Some(dest, links, prevRank + 1, prevLinks - links)
}
}
.collect { case Some(x) => x } // drop the initial None, and unwrap
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment