Created October 9, 2019 14:22
fs2 groupBy / KeyedEnqueue
import cats.effect.{Concurrent, Timer}
import fs2.{Chunk, Pipe, Stream}
import scala.concurrent.duration.FiniteDuration
package object fs2utils {
* Grouping logic used to split a stream into sub-streams identified by a unique key.
* Be aware that when working with an unbounded number of keys K, if streams never
* terminate, there can potentially be unbounded memory usage.
* The emitted streams terminate when the input terminates or when the stream
* returned by the pipe itself terminates. Termination is graceful and all input elements are emitted
* @param selector Function to retrieve grouping key from type A
* @tparam A Elements in the stream
* @tparam K A key used to group elements into substreams
* @return Streams grouped by a unique key identifier `K`.
def groupByUnbounded[F[_], A, K](selector: A => K)(
implicit F: Concurrent[F]
): Pipe[F, A, (K, Stream[F, A])] = { in =>
.resource(KeyedEnqueue.unbounded[F, K, A])
.flatMap { ke =>
/** Like `groupByUnbounded` but back pressures the stream when `maxItems` are inside */
def groupBy[F[_], A, K](maxItems: Long)(selector: A => K)(
implicit F: Concurrent[F]
): Pipe[F, A, (K, Stream[F, A])] = { in =>
.resource(KeyedEnqueue.itemBounded[F, K, A](maxItems))
.flatMap { ke =>
* Like `groupBy` but each substream is concurrently merged, emitting chunks when the substream has
* `maxChunkSize` pending or when the substream has waited `maxChunkTimeout` without emitting elements,
* similar to the standard `groupWithin` combinator
* @param maxTotalItems Backpressure when this many items are "in flight" concurrently
* @param maxChunkSize Output chunks satisfy: 0 < emittedChunk.size <= maxChunkSize
* @param maxChunkTimeout Emit chunks smaller than `maxChunkSize` if `maxChunkTimeout` time has elapsed without
* emitting any chunks for a given key `K` and we have elements that match that selector waiting
* @param selector Output elements satisfy: (key, chunk) => chunk.forall(a => selector(a) == key)
def groupWithinBy[F[_], A, K](
maxTotalItems: Long,
maxChunkSize: Int,
maxChunkTimeout: FiniteDuration
selector: A => K
)(implicit F: Concurrent[F], timer: Timer[F]): Pipe[F, A, (K, Chunk[A])] =
_.through(groupBy(maxTotalItems)(selector)).map {
case (key, stream) =>
.groupWithin(maxChunkSize, maxChunkTimeout)
.map(chunk => key -> chunk)
import cats.Monad
import cats.effect.concurrent.{Ref, Semaphore}
import cats.effect.{Concurrent, Resource}
import cats.implicits._
import fs2.{Pipe, Stream}
import fs2.concurrent.{NoneTerminatedQueue, Queue}
/** Represents the ability to enqueue keyed items into a stream of queues that emits homogenous keyed streams.
* This allows construction of a "keyed fan-out" behavior for a stream, which may be used for
* homogenously batching items that arrive via a heterogenous input
* Somewhat analogous to [[fs2.concurrent.Enqueue]]
private[fs2utils] trait KeyedEnqueue[F[_], K, A] {
/** Enqueue a single item for a given key, possibly creating and returning a new substream for that key
* @return <ul><li> None if the item was published to an already-live substream </li>
* <li>Some if a new queue was created for this element. This can happen multiple times for the same
* key (for example, if the implementation automatically terminates old/quiet substreams).</li></ul>
* The returned stream may eventually terminate, but it won't be cancelled by this.
def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]]
/** Gracefully terminate all sub-streams we have emitted so far */
def shutdownAll: F[Unit]
private[fs2utils] object KeyedEnqueue {
def unbounded[F[_]: Concurrent, K, A]: Resource[F, KeyedEnqueue[F, K, A]] =
Resource.liftF(Ref[F].of(Map.empty[K, NoneTerminatedQueue[F, A]])).flatMap {
st =>
(new UnboundedKeyedEnqueue[F, K, A](st): KeyedEnqueue[F, K, A])
def pipe[F[_]: Concurrent, K, A](
ke: KeyedEnqueue[F, K, A]
)(selector: A => K): Pipe[F, A, (K, Stream[F, A])] = { in =>
// Note this *must* be `++` specifically to allow for "input termination = output termination" behavior.
// Using `onFinalize` will allow the finalizer to be rescoped to the output of this stream later, which
// results in it not triggering because it's waiting for itself to terminate before it terminates itself
in.evalMap(a => ke.enqueue1(selector(a), a)).unNone ++
def itemBounded[F[_]: Concurrent, K, A](
maxItems: Long
): Resource[F, KeyedEnqueue[F, K, A]] =
for {
ke <- unbounded[F, K, A]
limit <- Resource.liftF(Semaphore[F](maxItems))
} yield new ItemBoundedKeyedEnqueue(ke, limit)
private class UnboundedKeyedEnqueue[F[_], K, A](
queues: Ref[F, Map[K, NoneTerminatedQueue[F, A]]]
)(implicit F: Concurrent[F])
extends KeyedEnqueue[F, K, A] {
override def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] =
override val shutdownAll: F[Unit] =
private[this] def withKey(key: K)(
use: NoneTerminatedQueue[F, A] => F[Unit]
): F[Option[(K, Stream[F, A])]] =
queues.get.flatMap { qm =>
.fold {
for { // No queue for key - create new one
newQ <- Queue.noneTerminated[F, A]
_ <- queues.update(x => x + (key -> newQ))
_ <- use(newQ)
} yield (key -> newQ.dequeue).some
}(q => use(q).as(None))
private class ItemBoundedKeyedEnqueue[F[_]: Monad, K, A](
ke: KeyedEnqueue[F, K, A],
limit: Semaphore[F]
) extends KeyedEnqueue[F, K, A] {
override def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] =
limit.acquire >> ke
.enqueue1(key, item)
.map( {
case (key, stream) =>
// We only need to attach the "release" behavior to a given stream once because each stream is emitted once, and then reused
key -> stream.chunks
.evalTap(c => limit.releaseN(c.size.toLong))
override val shutdownAll: F[Unit] = ke.shutdownAll
SystemFw commented Feb 4, 2021

You don't need the full power of a Semaphore, but you do want to swap that get + update with a modify

Daenyth commented Feb 5, 2021


chuwy commented Feb 17, 2021

To me it looks like modify is not possible here as updated Map depends on effectful Queue.noneTerminated[F, A]:

queuesMap.get(key) match {
  case Some(queue) =>
    (queuesMap, use(queue).as(None))
  case None =>
    val action = for {
      newQ <- Queue.noneTerminated[F, A]
      updadted = queuesMap + (key -> newQ)      // We cannot return it
      result <- use(newQ).as((key -> newQ.dequeue).some)
    } yield result

    (???, action)

Would be happy to be wrong.

Daenyth commented Feb 17, 2021

Yeah, it's not immediately obvious how to avoid that - it may involve refactoring. Possible using something like the keyed ref from davenverse

I feel like I reasoned out that the race condition was in practice not a problem, but I need to re-examine with fresh eyes. I wrote this quite a while ago.

SystemFw commented Mar 5, 2021

o me it looks like modify is not possible here as updated Map depends on effectful Queue.noneTerminated[F, A]

you create the Queue in advance (which has negligible overhead), just like you do it when you need a Deferred. In any case I'm adding a similar combinator to the library directly

@SystemFw Hi, has such combinator already been added to fs2? Thanks :)

Copy link

In any case I'm adding a similar combinator to the library directly

Have you done it? @SystemFw

Have you done it? @SystemFw

No, I started on it at the time, and then got side-tracked in creating what has become Channel. I also felt unsatisfied by how large the possible api surface could be, but I will admit that's probably the perfect being the enemy of the good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment