Skip to content

Instantly share code, notes, and snippets.

@mvillafuertem
Forked from kamilkloch/TestDispatcher.scala
Created June 3, 2023 10:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mvillafuertem/ebb5f1a2aae5990a71cb1226a3eb8d08 to your computer and use it in GitHub Desktop.
Save mvillafuertem/ebb5f1a2aae5990a71cb1226a3eb8d08 to your computer and use it in GitHub Desktop.
import cats.effect.std.{Dispatcher, Queue}
import cats.effect.{IO, IOApp}
import com.typesafe.scalalogging.Logger
import fs2.{Pipe, Stream}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable
import scala.concurrent.duration.DurationInt
object Subscriptions {
private object lock
private val cnt = new AtomicInteger(0)
private val subscriptions = mutable.Map.empty[Int, String => Unit]
def subscribe(cb: String => Unit, token: Int): Unit = lock.synchronized {
subscriptions.update(token, cb)
}
def unsubscribe(token: Int): Unit = lock.synchronized {
subscriptions.remove(token)
}
def notifyListeners: Unit = {
val listeners = lock.synchronized(subscriptions.values.toList)
listeners.foreach(_.apply(s"from callback: ${cnt.incrementAndGet().toString}"))
}
}
object TestDispatcher extends IOApp.Simple {
private val log = Logger[this.type]
private val requestStream = Stream.iterate(1)(_ + 1).covary[IO].evalTap(_ => IO.sleep(10.millis))
private val tokenFactory = new AtomicInteger(0)
def onInput(dispatcher: Dispatcher[IO], q: Queue[IO, String]): Unit = {
val token = tokenFactory.incrementAndGet()
def cb(s: String): Unit = {
dispatcher.unsafeRunSync(q.offer(s).delayBy(1.milli))
Subscriptions.unsubscribe(token)
}
Subscriptions.subscribe(cb, token)
}
val pipe: Pipe[IO, Int, String] = { input =>
Stream.eval(Queue.unbounded[IO, String]).flatMap { output =>
Stream.resource(Dispatcher[IO].onFinalize(IO(log.info("Releasing Dispatcher")))).flatMap { dispatcher =>
val processInput = input
.map(_ => onInput(dispatcher, output))
.drain
Stream.fromQueueUnterminated(output).concurrently(processInput)
}
}
}
def run: IO[Unit] = {
val consumer = requestStream
.through(pipe)
.evalTap(x => IO(log.info(s"consumer: $x")))
.timeout(200.millis)
.compile
.drain
val producer = IO.interruptible(true) {
while (true) {
Thread.sleep(10)
Subscriptions.notifyListeners
}
}.guarantee(IO(log.info("Canceling producer")))
IO.race(consumer, producer).void
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment