Skip to content

Instantly share code, notes, and snippets.

@mpilquist
Last active May 9, 2022 02:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mpilquist/f1b8de42255362a49922d6df3b6586ce to your computer and use it in GitHub Desktop.
Save mpilquist/f1b8de42255362a49922d6df3b6586ce to your computer and use it in GitHub Desktop.
Pausing & resuming an FS2 stream
// Like interruptWhen / interrupt but allows resumption of the source stream.
def pauseWhen[F[_]: Async, A](controlSignal: Signal[F, Boolean]): Pipe[F, A, A] =
pause(controlSignal.discrete)
def pause[F[_]: Async, A](control: Stream[F, Boolean]): Pipe[F, A, A] = {
def unpaused(
controlFuture: ScopedFuture[F, Pull[F, Nothing, (NonEmptyChunk[Boolean], Handle[F, Boolean])]],
srcFuture: ScopedFuture[F, Pull[F, Nothing, (NonEmptyChunk[A], Handle[F, A])]]
): Pull[F, A, Nothing] = {
(controlFuture race srcFuture).pull.flatMap {
case Left(controlPull) => controlPull.flatMap {
case (c, controlHandle) =>
val pause = c(c.size - 1)
if (pause) paused(controlHandle, srcFuture)
else controlHandle.awaitAsync.flatMap(unpaused(_, srcFuture))
}
case Right(srcPull) => srcPull.flatMap { case (c, srcHandle) =>
Pull.output(c) >> srcHandle.awaitAsync.flatMap(unpaused(controlFuture, _))
}
}
}
def paused(
controlHandle: Handle[F, Boolean],
srcFuture: ScopedFuture[F, Pull[F, Nothing, (NonEmptyChunk[A], Handle[F, A])]]
): Pull[F, A, Nothing] = {
controlHandle.receive { (c, controlHandle) =>
val stillPaused = c(c.size -1)
if (stillPaused) paused(controlHandle, srcFuture)
else controlHandle.awaitAsync.flatMap { controlFuture => unpaused(controlFuture, srcFuture) }
}
}
src => control.open.flatMap { controlHandle => src.open.flatMap { srcHandle =>
controlHandle.awaitAsync.flatMap { controlFuture =>
srcHandle.awaitAsync.flatMap { srcFuture =>
unpaused(controlFuture, srcFuture)
}
}
}}.close
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment