Last active
September 22, 2020 00:01
-
-
Save kiambogo/8247a7bbf79f00414d1489b7e6fc90d0 to your computer and use it in GitHub Desktop.
FS2 1.0.0 groupBy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fs2.concurrent.Queue | |
import cats.implicits._ | |
import cats.effect.Concurrent | |
import cats.effect.concurrent.Ref | |
def groupBy[F[_], A, K](selector: A => F[K])(implicit F: Concurrent[F]): Pipe[F, A, (K, Stream[F, A])] = { | |
in => | |
Stream.eval(Ref.of[F, Map[K, Queue[F, Option[A]]]](Map.empty)).flatMap { st => | |
val cleanup = { | |
import alleycats.std.all._ | |
st.get.flatMap(_.traverse_(_.enqueue1(None))) | |
} | |
(in ++ Stream.eval_(cleanup)) | |
.evalMap { el => | |
(selector(el), st.get).mapN { (key, queues) => | |
queues.get(key).fold { | |
for { | |
newQ <- Queue.unbounded[F, Option[A]] // Create a new queue | |
_ <- st.modify(x => (x + (key -> newQ), x)) // Update the ref of queues | |
_ <- newQ.enqueue1(el.some) | |
} yield (key -> newQ.dequeue.unNoneTerminate).some | |
}(_.enqueue1(el.some) as None) | |
}.flatten | |
}.unNone.onFinalize(cleanup) | |
} | |
} |
In our use case, we have an unbounded number of keys and the streams never terminate. Here's a method we used in conjunction with groupBy
to handle the problem:
def clientMessages[F[_], A, K](
batchSize: Int,
batchTime: FiniteDuration,
maxPending: Long
)(
selector: A => F[K]
): Pipe[F, A, (K, Chunk[A])] = { in =>
val batches: Pipe[F, A, (K, Chunk[A])] = _.through(groupBy(selector)).map {
case (k, kas) =>
kas.groupWithin(batchSize, batchTime).map(c => k -> c)
}.parJoinUnbounded
Stream.eval(Semaphore[F](maxPending)).flatMap { pending =>
in.evalTap(_ => pending.acquire)
.through(batches)
.evalTap(kc => pending.releaseN(kc._2.size.toLong))
}
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Okay, you need to join the internal streams with
parJoin
orparJoinUnbounded
to make it work as a single stream again