Skip to content

Instantly share code, notes, and snippets.

@lukestephenson
Last active July 18, 2023 16:24
Show Gist options
  • Save lukestephenson/d1a978f4ad10a2954027dd872d6b7438 to your computer and use it in GitHub Desktop.
Save lukestephenson/d1a978f4ad10a2954027dd872d6b7438 to your computer and use it in GitHub Desktop.
zio-streams performance observations

Background

I'm a long time user of reactive streams in Scala. Originally with akka-streams, and then Monix Observable.

The transition from akka streams to Monix Observable I found pretty straight forward. I was motivated by wanting to use a lazy effect system, rather than working with scala.concurrent.Future.

More recently I've been considering fs2 and Zio Streams as an alternative to Monix Observable. This has largely been motivated by the fact that Monix doesn't have much ongoing development and is stuck on cats-effect 2.

I've not found the transition from Monix Observable to Zio Streams as easy as my earlier transition (from akka -> monix). While the ergonomics of the API are similar, performance characterics differ hugely.

Chunking

Both fs2 and zio-streams document that for efficient usage, the underling stream needs to be chunked. Note this is a huge difference to Monix Obserable and Akka Streams in my experience. Both of these perform extremely well without chunking.

Example

Most of the work I do centers around using a reactive stream to represent processing a stream of events (from Kafka or a database). But for the purposes of this, I've just built a random Stream with ZIO.

  val source = ZStream
    .range(0, 10000)
    .groupedWithin(5000, 5.milliseconds)
    .mapZIO { messages =>
      ZIO.succeed(messages.sum)
    }
    .runSum
    .timed
    .map { case (duration, result) => println(s"took ${duration.toMillis}ms to calculate $result") }

All this does is make use of some operations on the ZStream to sum the elements. It is contrived and there are simplier solutions.

Now lets say that I have a new requirement to record a metric for each element that arrives on the Stream. I take a look at the ZStream api and see it offers a tap method to perform a side effect for each element of the stream.

So I change the solution above to:

  // For simplicity, lets just assume our metrics implementation records to this var
  var counter = 0

  val source = ZStream
    .range(0, 10000)
    .tap(message => ZIO.succeed {
      counter = counter + 1
    })
    .groupedWithin(5000, 5.milliseconds)
    // as before

This works, but in doing so I've added about 700ms of processing time. Nothing scientific here in how the timing was recorded, just how long it took on my macbook pro.

700ms is a huge amount of overhead. I've currently got some Kafka pipelines with akka streams / monix observable handling 20,000 messages per second. I've not even introduced network IO yet and it can't keep up.

Chunking alternative

Now, instead of using tap, I can use mapChunks.

That is:

  val source = ZStream
    .range(0, 10000)
    // replaced tap here with mapChunksZIO
    .mapChunksZIO { chunk =>
      ZIO.succeed {
        counter = counter + chunk.size
        chunk
      }
    }
    .groupedWithin(5000, 5.milliseconds)

If I do this, then recording the "metrics" has no noticable impact on performance. Note that ZStream.range is creating a Stream with 4096 element chunks, so mapChunksZIO is called 3 times.

Also note that in the previous implementation using tap, after calling tap the ZStream is now backed by chunks of size 1. So every subsequent operation is slow.

Thoughts / summary

I question if the ZStream API is fit for purpose if there is a large portion of the API that if used can severely degrade performance. Should all of the operations which can degrade performance be named appropriately? Should tap be renamed to slowTap? Or should all the slow methods be moved into an extension ZStreamSlowAPI so that it is clear when you are using a method that may impact performance.

With Monix Observable / Akka Streams, if there is an API exposed, I assume I can use it safely without it degrading performance. The same assumption doesn't hold true for fs2 and Zio Streams. Given all the hype around Zio performance, I came in with high expectations. So far I still prefer working with Monix Observable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment