Skip to content

Instantly share code, notes, and snippets.

@sholokhov
Created January 12, 2023 16:44
Show Gist options
  • Save sholokhov/d4b5a2da866c5a2117b5a335896a22a1 to your computer and use it in GitHub Desktop.
Save sholokhov/d4b5a2da866c5a2117b5a335896a22a1 to your computer and use it in GitHub Desktop.
fs2-kafka performance benchmark
package pro.sholokhov
// deps
//"org.typelevel" %% "cats-effect" % "3.3.13"
//"org.typelevel" %% "cats-effect-kernel" % "3.1.1"
//"org.typelevel" %% "cats-effect-std" % "3.1.1"
//"com.github.fd4s" %% "fs2-kafka" % "3.0.0-M9"
import cats.effect.{Clock, IO, IOApp}
import fs2.Stream
import fs2.kafka._
import cats.effect.unsafe.implicits.global
import scala.concurrent.duration.DurationInt
import scala.util.Random
object Main extends IOApp.Simple {
def run: IO[Unit] = {
val totalRecords = 1_000_000
val sizePerRecordBytes = 500
val repeat = 3
// partitions = 5, replication-factor = 1, retention.ms = 86400000, min.insync.replicas = 1
val topic = "perf-test-topic"
val producerSettings: ProducerSettings[IO, String, String] =
ProducerSettings[IO, String, String]
.withBootstrapServers("localhost:9091")
.withLinger(2.seconds)
val stream: Stream[IO, ProducerRecord[String, String]] = Stream
.emit(Random.alphanumeric.take(sizePerRecordBytes).mkString)
.repeatN(totalRecords)
.map(body => ProducerRecord(topic, key = Random.nextInt(1000).toString, body))
.covary[IO]
val process = stream
.chunkMin(1024, allowFewerTotal = true)
.map(rs => ProducerRecords(rs.toList))
.through(KafkaProducer.pipe(producerSettings))
.compile
.drain
Range
.inclusive(1, repeat)
.foreach(
_ =>
withLoggedTimingF(
process,
totalRecords,
sizePerRecordBytes
).unsafeRunSync()
)
IO.unit
}
private def withLoggedTimingF[A](
process: IO[A],
totalRecords: Long,
recordSizeBytes: Long
): IO[A] =
for {
start <- Clock[IO].realTime
result <- process
stop <- Clock[IO].realTime
totalMillis = stop.length - start.length
totalSeconds = totalMillis / 1000
recsSec = totalRecords / totalSeconds
mbSec = (BigInt(totalRecords) * recordSizeBytes / 1024 / 1024) / BigInt(totalSeconds)
_ <- IO.delay(
println(
s"complete ($totalRecords records), stats: $totalMillis millis ($recsSec recs/sec, $mbSec Mb/sec)"
)
)
} yield result
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment