Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Created December 8, 2015 20:06
Show Gist options
  • Save pchiusano/2f60860447f2fbfa5979 to your computer and use it in GitHub Desktop.
Save pchiusano/2f60860447f2fbfa5979 to your computer and use it in GitHub Desktop.
Design of splitting combinators in FS2
package fs2
import util.Monad
import Step._
import java.util.concurrent.atomic.AtomicLong
object diamond {
/**
* Pass elements of `s` through both `f` and `g`, then combine the two resulting streams.
* Implemented by enqueueing elements as they are seen by `f` onto a `Queue` used by the `g` branch.
* USE EXTREME CARE WHEN USING THIS FUNCTION. Deadlocks are possible if `combine` pulls from the `g`
* branch synchronously before the queue has been populated by the `f` branch.
*
* The `combine` function receives an `F[Long]` effect which evaluates to the current size of the
* `g`-branch's queue.
*
* When possible, use one of the safe combinators like `[[observe]]`, which are built using this function,
* in preference to using this function directly.
*/
def diamond[F[_],A,B,C,D](s: Stream[F,A])
(f: Stream[F,A] => Stream[F,B])
(qs: QueueStrategy[Chunk[A]], g: Stream[F,A] => Stream[F,C]) // in theory could just have a Queue here, but don't want to allow sharing queues between diamonds
(combine: (Stream[F,B], F[Long], Stream[F,C]) => Stream[F,D])(implicit F: Async[F]): Stream[F,D]
= Stream.suspend {
val q = qs.toQueue
val queueSize = new AtomicLong(0L)
def suspendf[A](a: => A) = F.map(F.pure(())) { _ => a }
combine(
f(
s.repeatPull { h => h.receive { case a #: h =>
Pull.eval(q.enqueue(Some(a))) >>
Pull.suspend { queueSize.incrementAndGet; Pull.pure(()) } >>
Pull.output(a).as(h) }}
.onComplete { Stream.eval_(q.enqueue(None)) }
),
suspendf { queueSize.get },
g(toFiniteStream(q) flatMap { c => Stream.suspend { queueSize.decrementAndGet; Stream.chunk(c) }})
)
}
trait Queue[F[_],A] { // placeholder for Pavel's real implementation
def enqueue(a: A): F[Unit]
def dequeue: F[A]
}
/** Convert a `Queue[F,Option[A]]` to a stream by treating `None` as indicating end-of-stream. */
def toFiniteStream[F[_],A](q: Queue[F,Option[A]]): Stream[F,A] = Stream.eval(q.dequeue).flatMap {
case None => Stream.empty
case Some(a) => Stream.emit(a) ++ toFiniteStream(q)
}
trait QueueStrategy[A] {
// todo needs to be F[_]:AsyncExt
private[fs2] def toQueue[F[_]:Async]: Queue[F,Option[A]]
}
// just meant to be suggestive here
def bounded[A](maxSize: Long): QueueStrategy[A] = ???
def unbounded[A]: QueueStrategy[A] = ???
/** Evict the oldest element when the number queued exceeds `maxSize`. */
def evictOldest[A](maxSize: Long): QueueStrategy[A] = ???
/** Evict the newest element when the number queued exceeds `maxSize`. */
def evictNewest[A](maxSize: Long): QueueStrategy[A] = ???
def observe[F[_]:Async,A](s: Stream[F,A])(sink: Sink[F,A]): Stream[F,A] =
diamond(s)(identity)(bounded(1), sink) { (a,n,sinkResponses) =>
(a repeatPull2 sinkResponses) { (h1, hq) =>
Pull.eval(n) flatMap { numberQueued =>
if (numberQueued >= 1) hq.receive { case _ #: hq => Pull.pure((h1,hq)) }
else h1.receive { case a #: h1 => Pull.output(a).as((h1,hq)) }
}
}
}
def observeAsync[F[_]:Async,A](s: Stream[F,A])(sink: Sink[F,A], maxQueued: Long): Stream[F,A] =
diamond(s)(identity)(bounded(maxQueued), sink andThen (_.drain)) {
(a,_,sinkResponses) => wye.merge(a,sinkResponses)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment