Skip to content

Instantly share code, notes, and snippets.

@avibryant
Created March 20, 2015 22:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save avibryant/b2df11671f1a8e5099d7 to your computer and use it in GitHub Desktop.
Save avibryant/b2df11671f1a8e5099d7 to your computer and use it in GitHub Desktop.
def takeBy(pipe: TypedPipe[(K,V)], max: Int)(fn: V => Double): TypedPipe[(K,V)] = {
implicit val qtreeSemi = QTreeSemigroup(4) //magic number, determines how much RAM the trees take
val qtrees = pipe.map{case (k,v) => k -> QTree(fn(v))}.sumByKey
val maxV = qtrees.flatMap{case (k,q) =>
if(q.size > max) {
val targetQuantile = max.toDouble / q.size
val (lower, upper) = q.quantileBounds(targetQuantile)
Some(k -> upper) //this will give us at least max values; use lower to get at most max values
}
else
None
}
pipe
.hashLeftJoin(maxV)
.flatMap{case (k,v,optMaxV) =>
if(!optMaxV.isDefined || v <= optMaxV.get)
Some(k -> v)
else
None
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment