Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Created August 12, 2022 13:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Daenyth/23350746868b93cfade92dce13903bbb to your computer and use it in GitHub Desktop.
Save Daenyth/23350746868b93cfade92dce13903bbb to your computer and use it in GitHub Desktop.
resetTimeout fs2
/** Execute `onTimeout` every time the stream goes `timeout` duration with no
* elements
*/
def resetTimeout[F[_]: Temporal, A](
timeout: FiniteDuration,
onTimeout: F[Unit]
): fs2.Pipe[F, A, A] = {
def go(timedPull: Pull.Timed[F, A]): Pull[F, A, Unit] =
timedPull.timeout(timeout) >>
timedPull.uncons.flatMap {
case Some((Left(_), rest)) => Pull.eval(onTimeout) >> go(rest)
case Some((Right(chunk), rest)) => Pull.output(chunk) >> go(rest)
case None => Pull.done
}
_.pull.timed(go).stream
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment