Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Created February 3, 2019 15:15
Show Gist options
  • Save pchlupacek/f033993302ee2a741a6473286306c9b3 to your computer and use it in GitHub Desktop.
Save pchlupacek/f033993302ee2a741a6473286306c9b3 to your computer and use it in GitHub Desktop.
package fs2.benchmark
import java.util.concurrent.TimeUnit
import cats.implicits._
import cats.effect.{Concurrent, ContextShift, IO}
import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State}
import fs2._
import fs2.concurrent.{SignallingRef, Topic}
@State(Scope.Thread)
class TopicBenchmark {
implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
val COUNT = 500
@GenerateN(1, 4, 8, 16, 32, 64, 128)
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Benchmark
def topicBroadcast(N: Int): Int = {
Stream
.eval(Topic[IO, Int](-1))
.flatMap { topic =>
Stream.eval(SignallingRef[IO, Int](0)).flatMap { startSignal =>
Stream.eval(SignallingRef[IO, Int](N)).flatMap { stopSignal =>
def subscribers =
Stream
.range(0, N)
.covary[IO]
.map { idx =>
startSignal.update(_ + 1) >>
topic.subscribe(Int.MaxValue).filter(_ + 1 == COUNT).take(1).compile.drain >>
stopSignal.update(_ - 1)
}
.evalMap(Concurrent[IO].start(_).void)
def publisher =
startSignal.discrete
.filter(_ == N)
.take(1)
.drain ++
Stream.range(0, COUNT).through(topic.publish).drain ++
stopSignal.discrete
.filter(_ == 0)
.take(1)
.drain
subscribers ++ publisher
}
}
}
.compile
.drain
.unsafeRunSync()
0
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment