Skip to content

Instantly share code, notes, and snippets.

@kamilkloch
Created July 20, 2023 12:10
Show Gist options
  • Save kamilkloch/1e39010624f3e5ed4c5d38699da9c753 to your computer and use it in GitHub Desktop.
Save kamilkloch/1e39010624f3e5ed4c5d38699da9c753 to your computer and use it in GitHub Desktop.
import cats.effect.std.Supervisor
import cats.effect.{IO, IOApp}
import fs2.Stream
import fs2.concurrent.Topic
import cats.syntax.all._
import scala.concurrent.duration.DurationInt
object Topics extends IOApp.Simple {
/** Feeds the provided topic with a timestamp every 500ms */
def tsService(topic: Topic[IO, Long]): IO[Nothing] = {
IO.realTime.flatMap { ts =>
val ts1Millis = ts.toMillis
topic.publish1(ts1Millis).map(_.fold(_ => throw new Exception("Topic closed"), _ => ts1Millis))
}.flatMap { ts1Millis =>
IO.realTime.map(ts2 =>
if (math.abs(ts2.toMillis - ts1Millis) > 1) println(s"Delay: ${math.abs(ts2.toMillis - ts1Millis)}ms")
)
}.delayBy(500.millis).foreverM
}
val n = 10_000
val consumerQueueSize = 1024
def responseStreamFromTopic(topic: Topic[IO, Long]): Stream[IO, Long] = topic.subscribe(consumerQueueSize)
def run: IO[Unit] = {
Supervisor[IO].use { sup =>
for {
tsTopic <- Topic[IO, Long]
_ <- tsService(tsTopic).supervise(sup)
_ <- List.fill(n)(0).parTraverse_(_ => responseStreamFromTopic(tsTopic).compile.drain)
} yield ()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment