Skip to content

Instantly share code, notes, and snippets.

@samuela
Last active January 2, 2016 13:18
Show Gist options
  • Save samuela/8308715 to your computer and use it in GitHub Desktop.
Save samuela/8308715 to your computer and use it in GitHub Desktop.
val o: Observable[Int] = Observable(1 to 20)
def bufferByKey[T, K](o: Observable[T], keyFunc: T => K): Observable[Seq[T]] = {
type S = (Option[K], List[T], Option[List[T]])
o.scan[S](None, List(), None) { (lastState: S, newVal: T) =>
val (lastKey, acc, emit) = lastState
val nk = keyFunc(newVal)
lastKey match {
case None => (Some(nk), newVal :: acc, None)
case Some(k) =>
if (k == nk)
(Some(k), newVal :: acc, None)
else
(Some(nk), List(newVal), Some(acc))
}
} flatMap {
case (_, _, None) => Observable()
case (_, _, Some(acc)) => Observable(acc.reverse)
}
}
val x = bufferByKey(o, { x: Int => x % 3 == 0 })
x subscribe { x => println("x= " + x) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment