Skip to content

Instantly share code, notes, and snippets.

@gatorcse
Last active January 14, 2023 21:09
Show Gist options
  • Save gatorcse/1f92aa7e52a04d9c91511ca79f73e911 to your computer and use it in GitHub Desktop.
Save gatorcse/1f92aa7e52a04d9c91511ca79f73e911 to your computer and use it in GitHub Desktop.
Merging two already sorted fs2 streams, with a sortBy function.
import cats._
import cats.implicits._
import cats.effect._
import cats.effect.implicits._
import fs2._
class StreamMerger[F[_]] {
def priorityOrderBy[A, B: Order](s1: Stream[F, A], s2: Stream[F, A])(f: A => B): Stream[F, A] = {
def go(p1: Stream.StepLeg[F, A], p2: Stream.StepLeg[F, A]): Pull[F, A, Unit] = {
val (elems, leftOrRight) = chunkMergeByPriority(p1.head.toList, p2.head.toList)(f)
Pull.output(Chunk.seq(elems)) >> (leftOrRight match {
case Left(remaining) =>
val p1next = p1.setHead(Chunk.seq(remaining))
p2.stepLeg.flatMap {
case None => Pull.output(p1next.head) >> p1next.stream.pull.echo
case Some(p2next) => go(p1next, p2next)
}
case Right(remaining) =>
val p2next = p2.setHead(Chunk.seq(remaining))
p1.stepLeg.flatMap {
case None => Pull.output(p2next.head) >> p2next.stream.pull.echo
case Some(p1next) => go(p1next, p2next)
}
})
}
s1.pull.stepLeg.flatMap {
case None => s2.pull.echo
case Some(leg1) => s2.pull.stepLeg.flatMap {
case None => Pull.output(leg1.head) >> leg1.stream.pull.echo
case Some(leg2) => go(leg1, leg2)
}
}.stream
}
def chunkMergeByPriority[A, B: Order](c1: List[A], c2: List[A])(f: A => B): (List[A], Either[List[A], List[A]]) = (c1, c2) match {
case (c1head :: c1tail, c2head :: _) if (Order[B].compare(f(c1head), f(c2head)) < 0) =>
val (emits, remaining) = chunkMergeByPriority(c1tail, c2)(f)
(c1head :: emits, remaining)
case (_ :: _, c2head :: c2tail) =>
val (emits, remaining) = chunkMergeByPriority(c1, c2tail)(f)
(c2head :: emits, remaining)
case (Nil, c2i) => Nil -> Either.right(c2i)
case (c1i, Nil) => Nil -> Either.left(c1i)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment