TL;DR - Use fs2.time.sleep_[Task](delay) ++ Stream.eval(effect)
instead of Stream.eval(effect.schedule(delay))
.
FS2 never interrupts evaluation of an effect. This can lead to surprising behavior when using the schedule
method on Task
. Consider this test driver:
def testInterruption[A](effect: Stream[Task, A]): Stream[Task, A] = {
val logStart = Stream.eval_(Task.delay(println("Started: " + System.currentTimeMillis)))
val logFinished = Stream.eval_(Task.delay(println("Finished: " + System.currentTimeMillis)))
val interruptSoonAfterStart =
Stream.eval(async.signalOf[Task,Boolean](false)).flatMap { cancellationSignal =>