Last active
September 22, 2020 00:01
-
-
Save kiambogo/8247a7bbf79f00414d1489b7e6fc90d0 to your computer and use it in GitHub Desktop.
FS2 1.0.0 groupBy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fs2.concurrent.Queue | |
import cats.implicits._ | |
import cats.effect.Concurrent | |
import cats.effect.concurrent.Ref | |
def groupBy[F[_], A, K](selector: A => F[K])(implicit F: Concurrent[F]): Pipe[F, A, (K, Stream[F, A])] = { | |
in => | |
Stream.eval(Ref.of[F, Map[K, Queue[F, Option[A]]]](Map.empty)).flatMap { st => | |
val cleanup = { | |
import alleycats.std.all._ | |
st.get.flatMap(_.traverse_(_.enqueue1(None))) | |
} | |
(in ++ Stream.eval_(cleanup)) | |
.evalMap { el => | |
(selector(el), st.get).mapN { (key, queues) => | |
queues.get(key).fold { | |
for { | |
newQ <- Queue.unbounded[F, Option[A]] // Create a new queue | |
_ <- st.modify(x => (x + (key -> newQ), x)) // Update the ref of queues | |
_ <- newQ.enqueue1(el.some) | |
} yield (key -> newQ.dequeue.unNoneTerminate).some | |
}(_.enqueue1(el.some) as None) | |
}.flatten | |
}.unNone.onFinalize(cleanup) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
In our use case, we have an unbounded number of keys and the streams never terminate. Here's a method we used in conjunction with
groupBy
to handle the problem: