Skip to content

Instantly share code, notes, and snippets.

@mpilquist
Last active May 17, 2021 13:17
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mpilquist/cfce8ebc1e1bcd8cd2278cf908dce36d to your computer and use it in GitHub Desktop.
Save mpilquist/cfce8ebc1e1bcd8cd2278cf908dce36d to your computer and use it in GitHub Desktop.
Properly scheduling effect evaluation in FS2

TL;DR - Use fs2.time.sleep_[Task](delay) ++ Stream.eval(effect) instead of Stream.eval(effect.schedule(delay)).

FS2 never interrupts evaluation of an effect. This can lead to surprising behavior when using the schedule method on Task. Consider this test driver:

def testInterruption[A](effect: Stream[Task, A]): Stream[Task, A] = {
  val logStart = Stream.eval_(Task.delay(println("Started: " + System.currentTimeMillis)))
  val logFinished = Stream.eval_(Task.delay(println("Finished: " + System.currentTimeMillis)))
  val interruptSoonAfterStart =
    Stream.eval(async.signalOf[Task,Boolean](false)).flatMap { cancellationSignal =>
      effect.interruptWhen(cancellationSignal).merge(time.sleep_[Task](1.second) ++ Stream.eval_(cancellationSignal.set(true)))
    }
  logStart ++ interruptSoonAfterStart ++ logFinished
}
› amm
Loading...
Welcome to the Ammonite Repl 0.8.2
(Scala 2.12.1 Java 1.8.0_112)
@ repl.compiler.settings.Ydelambdafy.value = "inline"
@ import $ivy.`co.fs2::fs2-core:0.9.4`, fs2._, fs2.util._, scala.concurrent.duration._
@ implicit val strategy: Strategy = Strategy.fromFixedDaemonPool(4)
@ implicit val scheduler: Scheduler = Scheduler.fromFixedDaemonPool(4)
@ def testInterruption[A](effect: Stream[Task, A]): Stream[Task, A] = ...
@ val t: Task[Unit] = Task.delay(println("Evaluated at " + System.currentTimeMillis))

First, let's baseline the behavior by evaluating an effect with no scheduled delay:

@ testInterruption(Stream.eval(t)).run.unsafeRun
Started: 1489155749951
Evaluated at 1489155749998
Finished: 1489155751012

The task was evaluated just after the stream was started. The stream finished 1 second later due to the scheduled cancellation stream.

Now let's try using Task#schedule to delay evaluation of the effect:

@ testInterruption(Stream.eval(t.schedule(10.seconds))).run.unsafeRun // Unsafe!!
Started: 1489155765869
Finished: 1489155766878

@ Evaluated at 1489155775873

EEK! The println in the effect was evaluated 10 seconds after stream start -- after the stream completed. In reality, the effect was evaluated just after stream startup, which resulted in a 10 second timer starting. When that timer fired, the rest of the effect was evaluated. By design, FS2 will never interrupt the evaluation of an effect.

Here's the proper way of scheduling evaluation:

@ testInterruption(time.sleep_[Task](10.seconds) ++ Stream.eval(t)).run.unsafeRun
Started: 1489155809571
Finished: 1489155810587

@

This has the expected semantics -- the "timer" has been separated from the effect evaluation, which allows FS2 to interrupt the stream after the timer has been set and before t has been evaluated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment