Skip to content

Instantly share code, notes, and snippets.

@kamilkloch
Created September 16, 2021 16:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kamilkloch/49316b3cbbb08bd052d0ea6285414a07 to your computer and use it in GitHub Desktop.
Save kamilkloch/49316b3cbbb08bd052d0ea6285414a07 to your computer and use it in GitHub Desktop.
import cats.effect.std.{Dispatcher, Queue}
import cats.effect.{IO, IOApp}
import com.typesafe.scalalogging.Logger
import fs2.{Pipe, Stream}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable
import scala.concurrent.duration.DurationInt
object Subscriptions {
private object lock
private val cnt = new AtomicInteger(0)
private val subscriptions = mutable.Map.empty[Int, String => Unit]
def subscribe(cb: String => Unit, token: Int): Unit = lock.synchronized {
subscriptions.update(token, cb)
}
def unsubscribe(token: Int): Unit = lock.synchronized {
subscriptions.remove(token)
}
def notifyListeners: Unit = {
val listeners = lock.synchronized(subscriptions.values.toList)
listeners.foreach(_.apply(s"from callback: ${cnt.incrementAndGet().toString}"))
}
}
object TestDispatcher extends IOApp.Simple {
private val log = Logger[this.type]
private val requestStream = Stream.iterate(1)(_ + 1).covary[IO].evalTap(_ => IO.sleep(10.millis))
private val tokenFactory = new AtomicInteger(0)
def onInput(dispatcher: Dispatcher[IO], q: Queue[IO, String]): Unit = {
val token = tokenFactory.incrementAndGet()
def cb(s: String): Unit = {
dispatcher.unsafeRunSync(q.offer(s).delayBy(1.milli))
Subscriptions.unsubscribe(token)
}
Subscriptions.subscribe(cb, token)
}
val pipe: Pipe[IO, Int, String] = { input =>
Stream.eval(Queue.unbounded[IO, String]).flatMap { output =>
Stream.resource(Dispatcher[IO].onFinalize(IO(log.info("Releasing Dispatcher")))).flatMap { dispatcher =>
val processInput = input
.map(_ => onInput(dispatcher, output))
.drain
Stream.fromQueueUnterminated(output).concurrently(processInput)
}
}
}
def run: IO[Unit] = {
val consumer = requestStream
.through(pipe)
.evalTap(x => IO(log.info(s"consumer: $x")))
.timeout(200.millis)
.compile
.drain
val producer = IO.interruptible(true) {
while (true) {
Thread.sleep(10)
Subscriptions.notifyListeners
}
}.guarantee(IO(log.info("Canceling producer")))
IO.race(consumer, producer).void
}
}
@kamilkloch
Copy link
Author

Example output with RTE:

[2021-09-16 18:38:08,919] INFO  [io-compute-6] TestDispatcher:66 - consumer: from callback: 1
[2021-09-16 18:38:08,924] INFO  [io-compute-6] TestDispatcher:66 - consumer: from callback: 2
[2021-09-16 18:38:08,924] INFO  [io-compute-6] TestDispatcher:66 - consumer: from callback: 3
[2021-09-16 18:38:08,926] INFO  [io-compute-6] TestDispatcher:66 - consumer: from callback: 4
[2021-09-16 18:38:08,927] INFO  [io-compute-6] TestDispatcher:66 - consumer: from callback: 5
[2021-09-16 18:38:08,929] INFO  [io-compute-2] TestDispatcher:66 - consumer: from callback: 6
[2021-09-16 18:38:08,941] INFO  [io-compute-0] TestDispatcher:66 - consumer: from callback: 7
[2021-09-16 18:38:08,953] INFO  [io-compute-1] TestDispatcher:66 - consumer: from callback: 8
[2021-09-16 18:38:08,964] INFO  [io-compute-4] TestDispatcher:66 - consumer: from callback: 9
[2021-09-16 18:38:08,999] INFO  [io-compute-5] TestDispatcher:53 - Releasing Dispatcher
[2021-09-16 18:38:09,014] INFO  [io-compute-6] TestDispatcher:76 - Canceling producer
Exception in thread "main" java.lang.IllegalStateException: dispatcher already shutdown
	at interruptible @ TestDispatcher$.run(TestDispatcher.scala:72)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment