Skip to content

Instantly share code, notes, and snippets.

@jilen
Last active December 10, 2018 09:45
Show Gist options
  • Save jilen/215018f92deb0f856b1342529c2f94ca to your computer and use it in GitHub Desktop.
Save jilen/215018f92deb0f856b1342529c2f94ca to your computer and use it in GitHub Desktop.
Fs2 Stream buffer with timeout
package libs
import _root_.fs2._
import cats.effect._
import cats.effect.concurrent._
import cats.effect.syntax.all._
import cats.syntax.all._
import scala.concurrent.duration._
import scala.language.higherKinds
package object fs2 {
private case class ChunkState[F[_], A](
chunk: Chunk[A],
timeout: Ref[F, Boolean],
cancel: CancelToken[F]
)
implicit class StreamSyntax[F[_], A](stream: Stream[F, A]) {
def timedBuffer(
minBufferSize: Int,
timeout: FiniteDuration
)(implicit T: Timer[F], F: ConcurrentEffect[F]) = {
def newState = Ref.of[F, Boolean](false).flatMap { outRef =>
(T.sleep(timeout) *> outRef.set(true)).start.map { fib =>
ChunkState[F, A](Chunk.empty, outRef, fib.cancel)
}
}
def go(state: ChunkState[F, A], s: Stream[F, A]): Pull[F, A, Unit] = {
Pull.eval(state.timeout.get).flatMap {
case true if state.chunk.nonEmpty =>
Pull.output(state.chunk) >> Pull
.eval(newState)
.flatMap(ns => go(ns, s))
case true =>
Pull.eval(newState).flatMap(ns => go(ns, s))
case false if state.chunk.size >= minBufferSize =>
Pull.eval(state.cancel) >> Pull.output(state.chunk) >> Pull
.eval(newState)
.flatMap(ns => go(ns, s))
case false =>
s.pull.uncons.flatMap {
case Some((hd, tl)) if hd.isEmpty =>
go(state, tl)
case Some((hd, tl)) =>
val ns = state.copy(chunk = Chunk.concat(Seq(state.chunk, hd)))
go(ns, tl)
case None =>
if (state.chunk.isEmpty) {
Pull.done
} else {
Pull.output(state.chunk) >> Pull.done
}
}
}
}
Pull.eval(newState).flatMap(i => go(i, stream)).stream
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment