Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Created October 9, 2019 14:22
Show Gist options
  • Save Daenyth/bcb0b4b8978dc9fa00735f374f5f2d92 to your computer and use it in GitHub Desktop.
Save Daenyth/bcb0b4b8978dc9fa00735f374f5f2d92 to your computer and use it in GitHub Desktop.
fs2 groupBy / KeyedEnqueue
import cats.effect.{Concurrent, Timer}
import fs2.{Chunk, Pipe, Stream}
import scala.concurrent.duration.FiniteDuration
package object fs2utils {
/**
* Grouping logic used to split a stream into sub-streams identified by a unique key.
* Be aware that when working with an unbounded number of keys K, if streams never
* terminate, there can potentially be unbounded memory usage.
*
* The emitted streams terminate when the input terminates or when the stream
* returned by the pipe itself terminates. Termination is graceful and all input elements are emitted
*
* @param selector Function to retrieve grouping key from type A
* @tparam A Elements in the stream
* @tparam K A key used to group elements into substreams
* @return Streams grouped by a unique key identifier `K`.
*/
def groupByUnbounded[F[_], A, K](selector: A => K)(
implicit F: Concurrent[F]
): Pipe[F, A, (K, Stream[F, A])] = { in =>
Stream
.resource(KeyedEnqueue.unbounded[F, K, A])
.flatMap { ke =>
in.through(KeyedEnqueue.pipe(ke)(selector))
}
}
/** Like `groupByUnbounded` but back pressures the stream when `maxItems` are inside */
def groupBy[F[_], A, K](maxItems: Long)(selector: A => K)(
implicit F: Concurrent[F]
): Pipe[F, A, (K, Stream[F, A])] = { in =>
Stream
.resource(KeyedEnqueue.itemBounded[F, K, A](maxItems))
.flatMap { ke =>
in.through(KeyedEnqueue.pipe(ke)(selector))
}
}
/**
* Like `groupBy` but each substream is concurrently merged, emitting chunks when the substream has
* `maxChunkSize` pending or when the substream has waited `maxChunkTimeout` without emitting elements,
* similar to the standard `groupWithin` combinator
*
* @param maxTotalItems Backpressure when this many items are "in flight" concurrently
* @param maxChunkSize Output chunks satisfy: 0 < emittedChunk.size <= maxChunkSize
* @param maxChunkTimeout Emit chunks smaller than `maxChunkSize` if `maxChunkTimeout` time has elapsed without
* emitting any chunks for a given key `K` and we have elements that match that selector waiting
* @param selector Output elements satisfy: (key, chunk) => chunk.forall(a => selector(a) == key)
*/
def groupWithinBy[F[_], A, K](
maxTotalItems: Long,
maxChunkSize: Int,
maxChunkTimeout: FiniteDuration
)(
selector: A => K
)(implicit F: Concurrent[F], timer: Timer[F]): Pipe[F, A, (K, Chunk[A])] =
_.through(groupBy(maxTotalItems)(selector)).map {
case (key, stream) =>
stream
.groupWithin(maxChunkSize, maxChunkTimeout)
.map(chunk => key -> chunk)
}.parJoinUnbounded
}
import cats.Monad
import cats.effect.concurrent.{Ref, Semaphore}
import cats.effect.{Concurrent, Resource}
import cats.implicits._
import fs2.{Pipe, Stream}
import fs2.concurrent.{NoneTerminatedQueue, Queue}
/** Represents the ability to enqueue keyed items into a stream of queues that emits homogenous keyed streams.
*
* This allows construction of a "keyed fan-out" behavior for a stream, which may be used for
* homogenously batching items that arrive via a heterogenous input
*
* Somewhat analogous to [[fs2.concurrent.Enqueue]]
*/
private[fs2utils] trait KeyedEnqueue[F[_], K, A] {
/** Enqueue a single item for a given key, possibly creating and returning a new substream for that key
*
* @return <ul><li> None if the item was published to an already-live substream </li>
* <li>Some if a new queue was created for this element. This can happen multiple times for the same
* key (for example, if the implementation automatically terminates old/quiet substreams).</li></ul>
* The returned stream may eventually terminate, but it won't be cancelled by this.
*/
def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]]
/** Gracefully terminate all sub-streams we have emitted so far */
def shutdownAll: F[Unit]
}
private[fs2utils] object KeyedEnqueue {
def unbounded[F[_]: Concurrent, K, A]: Resource[F, KeyedEnqueue[F, K, A]] =
Resource.liftF(Ref[F].of(Map.empty[K, NoneTerminatedQueue[F, A]])).flatMap {
st =>
Resource.make(
(new UnboundedKeyedEnqueue[F, K, A](st): KeyedEnqueue[F, K, A])
.pure[F])(_.shutdownAll)
}
def pipe[F[_]: Concurrent, K, A](
ke: KeyedEnqueue[F, K, A]
)(selector: A => K): Pipe[F, A, (K, Stream[F, A])] = { in =>
// Note this *must* be `++` specifically to allow for "input termination = output termination" behavior.
// Using `onFinalize` will allow the finalizer to be rescoped to the output of this stream later, which
// results in it not triggering because it's waiting for itself to terminate before it terminates itself
in.evalMap(a => ke.enqueue1(selector(a), a)).unNone ++
Stream.eval_(ke.shutdownAll)
}
def itemBounded[F[_]: Concurrent, K, A](
maxItems: Long
): Resource[F, KeyedEnqueue[F, K, A]] =
for {
ke <- unbounded[F, K, A]
limit <- Resource.liftF(Semaphore[F](maxItems))
} yield new ItemBoundedKeyedEnqueue(ke, limit)
}
private class UnboundedKeyedEnqueue[F[_], K, A](
queues: Ref[F, Map[K, NoneTerminatedQueue[F, A]]]
)(implicit F: Concurrent[F])
extends KeyedEnqueue[F, K, A] {
override def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] =
withKey(key)(_.enqueue1(item.some))
override val shutdownAll: F[Unit] =
queues.get.flatMap(_.values.toList.traverse_(_.enqueue1(None)))
private[this] def withKey(key: K)(
use: NoneTerminatedQueue[F, A] => F[Unit]
): F[Option[(K, Stream[F, A])]] =
queues.get.flatMap { qm =>
qm.get(key)
.fold {
for { // No queue for key - create new one
newQ <- Queue.noneTerminated[F, A]
_ <- queues.update(x => x + (key -> newQ))
_ <- use(newQ)
} yield (key -> newQ.dequeue).some
}(q => use(q).as(None))
}
}
private class ItemBoundedKeyedEnqueue[F[_]: Monad, K, A](
ke: KeyedEnqueue[F, K, A],
limit: Semaphore[F]
) extends KeyedEnqueue[F, K, A] {
override def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] =
limit.acquire >> ke
.enqueue1(key, item)
.map(_.map {
case (key, stream) =>
// We only need to attach the "release" behavior to a given stream once because each stream is emitted once, and then reused
key -> stream.chunks
.evalTap(c => limit.releaseN(c.size.toLong))
.flatMap(Stream.chunk)
})
override val shutdownAll: F[Unit] = ke.shutdownAll
}
@umbreak
Copy link

umbreak commented Feb 4, 2021

The method withKey does not seems thread safe to me. You are doing a get on a Ref and afterwards if the ref does not have the desired key in the map, you are doing a queues.update. I think this method should be wrapped in a cats.effect.concurrent.Semafor

semaphore.withPermit {
   // the content of withKey
}

@SystemFw
Copy link

SystemFw commented Feb 4, 2021

You don't need the full power of a Semaphore, but you do want to swap that get + update with a modify

@Daenyth
Copy link
Author

Daenyth commented Feb 5, 2021

Thanks!

@chuwy
Copy link

chuwy commented Feb 17, 2021

To me it looks like modify is not possible here as updated Map depends on effectful Queue.noneTerminated[F, A]:

queuesMap.get(key) match {
  case Some(queue) =>
    (queuesMap, use(queue).as(None))
  case None =>
    val action = for {
      newQ <- Queue.noneTerminated[F, A]
      updadted = queuesMap + (key -> newQ)      // We cannot return it
      result <- use(newQ).as((key -> newQ.dequeue).some)
    } yield result

    (???, action)
}

Would be happy to be wrong.

@Daenyth
Copy link
Author

Daenyth commented Feb 17, 2021

Yeah, it's not immediately obvious how to avoid that - it may involve refactoring. Possible using something like the keyed ref from davenverse

I feel like I reasoned out that the race condition was in practice not a problem, but I need to re-examine with fresh eyes. I wrote this quite a while ago.

@SystemFw
Copy link

SystemFw commented Mar 5, 2021

o me it looks like modify is not possible here as updated Map depends on effectful Queue.noneTerminated[F, A]

you create the Queue in advance (which has negligible overhead), just like you do it when you need a Deferred. In any case I'm adding a similar combinator to the library directly

@sideeffffect
Copy link

@SystemFw Hi, has such combinator already been added to fs2? Thanks :)

@Masynchin
Copy link

In any case I'm adding a similar combinator to the library directly

Have you done it? @SystemFw

@SystemFw
Copy link

No, I started on it at the time, and then got side-tracked in creating what has become Channel. I also felt unsatisfied by how large the possible api surface could be, but I will admit that's probably the perfect being the enemy of the good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment