Skip to content

Instantly share code, notes, and snippets.

@lJoublanc
Created August 9, 2017 15:29
Show Gist options
  • Save lJoublanc/cbd10d947eba2da76b32678edd03ea5d to your computer and use it in GitHub Desktop.
Save lJoublanc/cbd10d947eba2da76b32678edd03ea5d to your computer and use it in GitHub Desktop.
Example of race condition fs2 branch 0.10.0: Signal completes before we can get any values out.
import fs2._
import cats.effect._
import fs2.async.mutable.Signal
import scala.concurrent.ExecutionContext.Implicits.global
implicit val ioInstance = Effect[IO]
val input = Stream.range(1,20).covary[IO] // <-- try changing this to range(1,2), result is empty!
val output : Stream[IO,Signal[IO,Option[Int]]] =
Stream eval async.signalOf[IO,Option[Int]](Some(0)) flatMap { sig =>
Stream(sig) concurrently input.noneTerminate.evalMap(sig.set).drain
}
val apis = output flatMap { o =>
val api1 = o.discrete.unNoneTerminate.fold(Nil : List[Int])((l,i) => i :: l)
val api2 = o.discrete.unNoneTerminate.fold(Nil : List[Int])((l,i) => i :: l)
api1 merge api2
}
apis.runLog.unsafeRunSync()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment