Skip to content

Instantly share code, notes, and snippets.

@BalmungSan
Created November 5, 2020 19:55
Show Gist options
  • Save BalmungSan/6997362cf9ccf394b189ca29847bb329 to your computer and use it in GitHub Desktop.
Save BalmungSan/6997362cf9ccf394b189ca29847bb329 to your computer and use it in GitHub Desktop.
Stops an fs2 Stream if it didn't produced any new elements after some timeout
import cats.syntax.all._
import cats.effetc.{Concurrent, Timer}
import cats.effect.concurrent.Ref
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.duration.FiniteDuration
def stopIfNoNewElementsAfter[F[_] : Concurrent : Timer, A](
stream: Stream[F, A],
duration: FiniteDuration
): Stream[F, A] = {
val stopSignalF = SignallingRef[F, Boolean](false)
val newElementsRefF = Ref[F].of(false)
Stream.eval((stopSignalF, newElementsRefF).tupled).flatMap {
case (stopSignal, newElementsRef) =>
val stopStream =
Stream
.fixedDelay[F](duration)
.evalTap { _ =>
newElementsRef.getAndSet(false).flatMap { thereWasNewElements =>
stopSignal.set(true).unlessA(thereWasNewElements)
}
}
stream
.evalTap(_ => newElementsRef.set(true))
.interruptWhen(stopSignal)
.concurrently(stopStream)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment