Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save arjanblokzijl/f10a0f5565ca6808d105c4388f277138 to your computer and use it in GitHub Desktop.
Save arjanblokzijl/f10a0f5565ca6808d105c4388f277138 to your computer and use it in GitHub Desktop.
ordered join
import fs2._
object OrderedJoin {
/**
* Joins two streams, that are assumed to be sorted, and keeps the ordering intact.
* For example:
* join(Stream(1,1,2,4,4,5), Stream(1,1,2,3,4) will return Stream(1,1,1,1,2,2,3,4,4,4,5)
*
* @param left the left stream
* @param right the right stream
* @param k function that returns the identifier for the element A
* @return the joined stream.
*/
def join[F[_], A](left: Stream[F, A], right: Stream[F, A])(k: A => Long): Stream[F, A] = {
def go(p1: Pull[F, Nothing, Option[(A, Stream[F, A])]],
p2: Pull[F, Nothing, Option[(A, Stream[F, A])]]): Pull[F, A, Unit] = {
p1.flatMap {
case None =>
p2.flatMap {
case None =>
Pull.pure(None)
case Some((h1, tl)) =>
Pull.output1(h1) >> go(p1, tl.pull.uncons1)
}
case Some((h1, t1)) =>
p2.flatMap {
case Some((h2, t2)) =>
//we have both
if (k(h1) < k(h2)) {
Pull.output1(h1) >> go(t1.pull.uncons1, p2)
} else {
Pull.output1(h2) >> go(p1, t2.pull.uncons1)
}
case None =>
Pull.output1(h1) >> go(t1.pull.uncons1, p2)
}
}
}
go(left.pull.uncons1, right.pull.uncons1).stream
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment