Skip to content

Instantly share code, notes, and snippets.

@hanny24
Created April 16, 2021 08:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hanny24/b6e5789f47b8d38340d0e10b001fdb6b to your computer and use it in GitHub Desktop.
Save hanny24/b6e5789f47b8d38340d0e10b001fdb6b to your computer and use it in GitHub Desktop.
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.{ExitCode, IO, IOApp}
import fs2.Chunk
import scala.concurrent.duration._
import scala.util.Random
object FS2Test extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val signal = Deferred.unsafe[IO, Unit]
val receive = fs2.Stream
.unfoldEval[IO, Int, Int](0) { v =>
IO {
println(s"received ${v}")
Some((v, v + 1))
}
}
.repeat
.map(Some(_))
val terminateSignal = fs2.Stream.eval_(signal.get)
val running = Ref.unsafe[IO, Int](0)
// this works as expected
// val stream = fs2.Stream(receive, terminateSignal).parJoinUnbounded.unNoneTerminate
val stream = receive.mergeHaltBoth(terminateSignal).unNoneTerminate
stream
.flatMap(v => fs2.Stream.chunk(Chunk(s"$v:1", s"$v:2", s"$v:3", s"$v:4", s"$v:5", s"$v:6")))
.map { _ =>
fs2.Stream.eval {
IO(Random.nextInt(1500)).flatMap { rnd =>
running
.modify(currentlyRunning => (currentlyRunning + 1, currentlyRunning + 1))
.flatMap(v => IO(println(s"currently running ${v}"))) *>
IO.sleep(rnd.millis) *>
running.update(_ - 1)
}
}
}
.parJoin(40)
.compile
.drain
.as(ExitCode.Success)
}
}
// output:
// received 0
// currently running 2
// currently running 3
// currently running 1
// currently running 4
// currently running 6
// currently running 5
// received 1
// currently running 1
// currently running 2
// currently running 3
// currently running 4
// currently running 5
// currently running 6
// received 2
// currently running 1
// currently running 2
// currently running 3
// currently running 4
// currently running 5
// currently running 6
// received 3
// ...
// expected output
// received 0
// received 1
// received 2
// currently running 7
// currently running 5
// currently running 1
// currently running 9
// currently running 10
// currently running 3
// currently running 8
// currently running 4
// currently running 11
// currently running 12
// currently running 6
// currently running 2
// received 3
// currently running 13
// currently running 14
// currently running 15
// currently running 16
// currently running 17
// currently running 18
// received 4
// currently running 19
// currently running 20
// currently running 21
// currently running 22
// currently running 23
// currently running 24
// received 5
// currently running 25
// currently running 26
// currently running 27
// currently running 28
// currently running 29
// currently running 30
// received 6
// currently running 31
// currently running 32
// currently running 33
// currently running 34
// currently running 35
// currently running 36
// received 7
// currently running 37
// currently running 38
// currently running 39
// currently running 40
// currently running 40
// currently running 40
// received 8
// currently running 40
// currently running 40
// currently running 40
// currently running 40
// currently running 40
// received 9
// currently running 40
// currently running 40
// currently running 40
// currently running 40
// currently running 40
// currently running 40
// currently running 40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment