Skip to content

Instantly share code, notes, and snippets.

@johnynek
Last active October 14, 2022 10:32
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johnynek/689199b4ac49364e7c94abef996ae59f to your computer and use it in GitHub Desktop.
Save johnynek/689199b4ac49364e7c94abef996ae59f to your computer and use it in GitHub Desktop.
merge sorted streams with fs2
import fs2.{Chunk, Stream, Pull}
import cats.collections.Heap
import cats.implicits._
object SortMerge {
def sortMerge[F[_], A: Ordering](streams: List[Stream[F, A]]): Stream[F, A] = {
implicit val ord: cats.Order[Stream.StepLeg[F, A]] =
new cats.Order[Stream.StepLeg[F, A]] {
val ordA = implicitly[Ordering[A]]
def compare(left: Stream.StepLeg[F, A], right: Stream.StepLeg[F, A]): Int = {
if (left.head.isEmpty) {
// prefer to step so we don't skip items
if (right.head.isEmpty) 0 else -1
}
else if (right.head.isEmpty) {
// we need to step so we don't misorder items
1
}
else {
// neither are empty just compare the head
ordA.compare(left.head(0), right.head(0))
}
}
}
def go(heap: Heap[Stream.StepLeg[F, A]]): Pull[F, A, Unit] =
heap.pop match {
case Some((sl, rest)) =>
if (sl.head.nonEmpty) {
Pull.output1(sl.head(0)) >> {
val nextSl = sl.setHead(sl.head.drop(1))
val nextHeap = rest.add(nextSl)
go(nextHeap)
}
}
else {
// this chunk is done
sl.stepLeg
.flatMap {
case Some(nextSl) =>
val nextHeap = rest.add(nextSl)
go(nextHeap)
case None =>
// this leg is exhausted
go(rest)
}
}
case None => Pull.done
}
def heapOf(ls: List[Stream.StepLeg[F, A]]): Heap[Stream.StepLeg[F, A]] =
Heap.fromIterable(ls)
val heap: Pull[F, fs2.INothing, Heap[Stream.StepLeg[F, A]]] =
streams
.traverse(_.pull.stepLeg)
.map { ls => heapOf(ls.flatten) }
heap.flatMap(go).stream
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment