Skip to content

Instantly share code, notes, and snippets.

@lukestephenson
Last active January 29, 2024 08:12
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lukestephenson/9ee6639f8a63d30e8573571f05d6498f to your computer and use it in GitHub Desktop.
Save lukestephenson/9ee6639f8a63d30e8573571f05d6498f to your computer and use it in GitHub Desktop.
Comparing Scala streaming library performance

Diving into Monix / Akka performance compared to fs2 and ZIO-streams

Disclaimer: Firstly, I'm not an expert in any of these libraries. I'm writing this from a position of sharing what I have learnt so far, but also hoping to be criticised and learn a lot more in the process.

Both fs2 and zio-streams document the need to make use of "Chunking". Here is what the zio-streams docs say:

Every time we are working with streams, we are always working with chunks. There are no streams with individual elements, these streams have always chunks in their underlying implementation. So every time we evaluate a stream, when we pull an element out of a stream, we are actually pulling out a chunk of elements.

So why streams are designed in this way? This is because of the efficiency and performance issues. Every I/O operation in the programming world works with batches. We never work with a single element.

While this is true that IO operations work with batches, from a programmers perspective it is sometimes easier to reason about elements individually. I'll go into an example shortly. Sure when reading a http request I don't want a stream of individual bytes, but when processing records from a Kafka topic I feel it is perfectly reasonable to want to consider records in isolation.

Also, as I've documented before, both fs2 and zio-streams expose operations on the stream which break chunking and can massively degrade performance.

I loved Adam Fraser's excellent talk The Next Generation of Streaming and was really hopeful that was going to improve some of my performance concerns, but unfortunately I didn't observe any improvements around small chunk sizes with synchronous transformations (I'm working with Kafka and a lot of calls to produce don't result in network calls, just adding to an internal buffer, so there isn't the overhead of lots of network calls to hide away the CPU overheads of the streaming implementation).

Example application

For our example application lets take a Kafka consumer. The consumer reads messages, we can imagine the kafka topic looking like:

[DB('hello'),ForwardToKafka('world'),DB('blah'),ForwardToKafka('yeah'),.....]

Depending on the inbound message, we need to make a processing decision. If we were not concerned with Chunking, we could just do something approximately like (not real code, just conveying an idea):

myKafkaStream.mapZIOPar(100) { consumerRecord => 
  val processingLogic: Task[UpdateResult] = consumerRecord.value match {
    case DB(value) => writeToDb(value)
    case ForwardToKafka(value) => zioKafkaProducer.produce(value)
  }
  processingLogic.map(updateResult => updateResult -> consumerRecord)
}

Note that we return some details about the processing and the original consumerRecord as further downstream processing may need it for other processing (and the Kafka logic requires it for committing back offsets). The mapZIOPar(100) is because the kafka producer will block on each record produced, but internally the producer will batch records together to send (ie many calls to zioKafkaProducer.produce result in one call to the brokers. Anyway, the focus of this post is not on Kafka batching, but I just want to highlight that this discussion is not focussed on Kafka batching for effiency, rather the lack of efficiency when a zio-stream is used without chunking.

At this point I'd say the program is very easy to understand.

But, zio-kafka is implemented with zio-streams. And as already stated, this will be slow unless chunking is used. So that call zioKafkaProducer.produce(value) is not going to perform well. For zio-kafka to perform well, we need to use the zioKafkaProducer.produceChunk... variants.

So our program needs to be modified. Here is another attempt:

myKafkaStream.mapZioChunks { consumerRecordChunk => 
  // Firstly we need to split the Chunk into the different processing units. Note when splitting we also need an index associated with each element so we can construct the correct order again later.
  val partitionedChunks: (Chunk[(Index,DB)], Chunk[(Index,ForwardToKafka)]) = ??? // partitioning logic
  
  // handle the DB updates
  val dbTask = writeToDBChunked(partitionedChunks._1.map(_._2)) // writing to the DB doesn't care about the Index, just wants the data to write
  
  val kafkaTask = zioKafkaProducer.produceChunks(partitionedChunks._2.map(_._2)) // again, writing to kafka needs the Chunk, but without the Index
  
  for {
    dbResults <- dbTask
    kafkaResults <- kafkaTask
  } yield mergeResultsWithOriginalConsumerRecordsPreservingOrdering(....) // Needs
}

This is nowhere near as easy to reason about. While "Every I/O operation in the programming world works with batches." is true, this doesn't mean it is always the easiest way to reason about a program.

I work in a company where developers work in a variety of languages. They will not all be experts in ZIO and zio-streams and I need them to be able to quickly express applications in the simplest form, not to have to reason about different performance semantics depending on how their appliation is structured.

Performance

So back to the initial point, why do FS2 / zio-streams have this requirement to have streams backed by chunks, but Monix and Akka streams are amazingly fast working with individual elements?

Now, I don't know for certain, but my best guess is this.

Monix represents the contract between stream operations with this trait:

trait Observer[-A] extends Any with Serializable {
  def onNext(elem: A): Future[Ack]

  def onError(ex: Throwable): Unit

  def onComplete(): Unit
}

And Akka streams has a similar approach.

Both fs2 and zio-streams model all the internal transformations on a stream as pure values. Here is a small snippet of some of the stream transformations from ZIO.

sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDone] {

  private[stream] final case class Emit[OutElem](override val trace: Trace, elem: OutElem)
      extends ZChannel[Any, Any, Any, Any, Nothing, OutElem, Unit]

  private[stream] final case class Fail[OutErr](err: Cause[OutErr])
      extends ZChannel[Any, Any, Any, Any, OutErr, Nothing, Nothing]

  private[stream] final case class Fold[Env, InErr, InElem, InDone, OutErr, OutErr2, OutElem, OutDone, OutDone2](
    override val trace: Trace,
    channel: ZChannel[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone],
    onErr: Cause[OutErr] => ZChannel[Env, InErr, InElem, InDone, OutErr2, OutElem, OutDone2],
    onDone: OutDone => ZChannel[Env, InErr, InElem, InDone, OutErr2, OutElem, OutDone2]
  ) extends ZChannel[Env, InErr, InElem, InDone, OutErr2, OutElem, OutDone2]

So every element that is pushed through the stream involves allocating objects to represent the transformation and something interpretting the actions to be done on the stream.

Here are two implementations of summing a stream of 10 million elements in Monix and zio-streams.

Monix:

val max = 10_000_000

val source: Observable[Long] = Observable.range(from = 0, until = max, step = 1)

val consumeWithFold: Task[Unit] = {
  source.foldLeftL(0L){case (sum, elem) => elem + sum }.void
}

ZIO:

val max = 10_000_000

val source: ZStream[Any, Nothing, Int] = ZStream.range(min = 0, max = max, chunkSize = 1)

val consumeWithFold: UIO[Long] = {
  source.runFold(0L){case (sum, elem) => sum + elem}
}

On my laptop, the monix implementation takes 85ms on average and the ZIO implementation takes 1447ms. That is a pretty big difference (17x).

When profiling the applications, the Monix one results in approximately 50MB of memory allocations,the ZIO one allocates 6.2GB. This time an even bigger difference (124x).

FS2 takes 3409ms and allocates 15.5GB.

Monix memory profiling Monix memory profiling

ZIO-streams memory profiling image

Is this fundamental difference of an API contract for stream communication vs an expressive set of values the reason why Akka / Monix can perform well without chunking while FS2 / zio-streams cannot? I honestly don't know. The monix implementation of map on a stream just calls the mapping function then calls onNext. In the zio-streams world, I believe this requires the upstream returning a value to express a value should be emitted, and the downstream also returning a value to express it wants a value, and some coordinator between those two things interpretting that and passing the value between the upstream and downstream. Maybe I just need to accept that fs2 / zio-streams will never be as fast as akka streams / monix because of that model.

Closing remarks

I can understand why Akka is less popular than FS2 / zio-streams, because the API deals with scala.concurrent.Future which isn't as nice to work with.

But given Monix Observable had a nice public API, and given how well Monix Observable performs, I'm curious why it has lost the battle for adoption against FS2 and ZIO-streams. As a user I don't really care if the internal implementation is pure, I'd rather the exposed API was performant for all operations.

@erikvanoosten
Copy link

erikvanoosten commented Sep 1, 2023

but Monix and Akka streams are amazingly fast working with individual elements?

For Akka it is because they do batching without you asking for it.
(Search for throughput = 5 on https://doc.akka.io/docs/akka/current/general/configuration-reference.html.)

@diesalbla
Copy link

The major difference between Monix Observable, and FS2 or ZIO-Streams.

A Monix Observable, in particular the implementation of a range, a subscription is a "process" that is going to start pushing the values of the range, one by one, to the subscriber. It is therefore a strict process: the observable keeps sending more values to the subscriber (the producer to the consumer).

Instead, in FS2 a range is implemented as a stream of singleton chunks, and the key difference is that this stream does not have a reference to the consumer, the subscriber, to start pushing elements to it. Instead, it is that consumer that has to decide if and when it is going to read the next element from that stream. It is a pull model.

@He-Pin
Copy link

He-Pin commented Sep 1, 2023

Monix is great ,both in api and performance , but library took many times, personal time. I think that's the real reason, And maybe akka-stream can have something like mapAsyncEval etic thing, which would be better.

@lukestephenson
Copy link
Author

but Monix and Akka streams are amazingly fast working with individual elements?

For Akka it is because they do batching without you asking for it. (Search for throughput = 5 on https://doc.akka.io/docs/akka/current/general/configuration-reference.html.)

Thanks for the comment @erikvanoosten

The throughput = 5 doesn't appear to be related to akka streams. The configuration specifically for akka streams doesn't mention any batching https://doc.akka.io/docs/akka/current/general/stream/stream-configuration.html.

Even if Akka streams does it behind the scenes, whatever they are doing appears to perform much faster for my production use cases and I've not found this to be a concern when using Akka Streams (or Monix Observable).

@lukestephenson
Copy link
Author

@diesalbla As a user, both APIs are very similar. I don't care so much if under the hood the model is push or pull based, as long as backpressure is respected. But I think you are right that this design choice leads to different performance characteristics.

The monix docs also mention this in the comparison to fs2:

Where Monix is better:

  • the model of communication between producers and consumers is push-based (with back-pressure) and this makes it inherently more efficient

And it looks like Monix's own pull based implementation (which I've never used) also has batches. https://monix.io/api/current/monix/tail/batches/index.html

@lukestephenson
Copy link
Author

I've just watched the talk "A Tale of Two Monix Stream by Alexandru Nedelcu" and it's really helped me understand the performance differences.

From https://www.youtube.com/watch?v=y7QfAWIun2k
image

And when Alexandru is talking about the Iterant solution, the following is mentioned:
image

I've really loved revisting this talk.
image

I'm not a library author, and I can appreciate that there are trade offs how easy it is to manage a push vs pull based implementation. But as I user, I've got a preference for the push based Observable model. Around the 33 minute mark of the talk, Alexandru says something along the lines of:
Screw elegance of implementation, if it's properly encapsulated you need to achieve a good throughput such that the user doesn't have to worry about it... the user can then worry about their own business logic or whatever....

As a user of these libs, I completely resonate with that sentiment trying to migrate away from Monix Observable (because of maintenance issues) towards fs2 or zio-streams.

@He-Pin
Copy link

He-Pin commented Sep 4, 2023

Have you tested with kotlin's flow? which is pull based too, and is there any benchmark to share.

@diesalbla
Copy link

diesalbla commented Sep 4, 2023

@lukestephenson Regarding the FS2 performance measure, could you post the code? I think that using the following in the example should show a major speedup on its performance, which makes it behave closer to what the monix example would do.

In essence, if the code uses the method Stream.range(from, to, step), then that should be replaced with the following:

Stream.emits( Chunk.indexedSeq[Int](Range(from, to, step)))

Unlike the default Stream.range, which builds one chunk and one boxed integer for every element, this one should use a constant space on the size of the list, just as monix does. The trick (or the leverage of the abstraction) being that a chunk can be any sequential collection, and in Scala a sequential collection is just a function of index to values.

@lukestephenson
Copy link
Author

@diesalbla I'm aware of APIs on fs2 Stream which allow a Stream to be created backed by chunks, and this will help with performance. The point I'm trying to make is that not all business flows are easy to express in terms of Chunking (see original example). Plus there are APIs on fs2 Stream and zio-streams which break chunking and degrade performance. It's already a big learning curve to get more devs onto Scala, without them having to be aware of chunking behaviour and if they don't understand it potentially having large performance impacts on applications. That wasn't a concern for me when introducing devs to Monix Observable.

@He-Pin I've not done any Kotlin development before.

@He-Pin
Copy link

He-Pin commented Sep 5, 2023

@lukestephenson There are RxJava ,Reactor-core and smallrye-mutiny too:)

@diesalbla
Copy link

diesalbla commented Sep 5, 2023

Oh, I see. Yes, the APIs force people to deal with chunking, as a feature of the library.

So, one example I know is evalMap in fs2, which breaks the chunk structure, so no matter how input elements are arranged in the source, the output stream (of B) emits them in singleton chunks.

class Stream[F, A]
  def evalMap[B](fun: A => F[B]): Stream[F, B] =
    flatMap(a => Stream.eval(fun(a)) 

There is a reason for breaking chunks. Streams are as much about data flow, in the sense of pumping data through the filters and transformations and aggregations, as it is about control flow: specifying_if and when_ the stream computation carries out what side effects. For a data-flow point of view, chunking is just a throughput vs latency tradeoff, whereas for the control-flow PoV, chunking matters because effects only take place between chunks. In the evalMap example, having evalMap split into chunks means that, if the stream consumer only wants the function fun run on the first 10 valid items and no more, then all they have to do is stream.evalMap(fun).take(10), and the lazy pull-based semantics discards the rest.

One could discuss if this API follows the principle of minimal surprise, but the notion of minimal surprise differs. For a data-flow use-case, breaking chunks and degrade performance is an unwelcome surprise; but for a control-flow one, having a stream process a hundred elements when it is only supposed to process ten is a bad surprise.

Going back to your original problem, though, it seems to me that the complexity there is that you have the splitting (between database and Kafka topics) all done per-chunk. Perhaps it would help to do the splitting at the level of streams, so that the initial stream breaks out in two streams (DB or Kafka), such that the consumer of each one can do its own aggregation and chunking and batching.

@He-Pin
Copy link

He-Pin commented Sep 5, 2023

@diesalbla I want to make use of CE and ZIO, with Akka/Pekko, apache/pekko#628 , do you think this is a reasonable way?

Another thing I am thinking: Once JDK 21 is released, different fiber can just block to waiting, the stream can be simpler to build ontop.

@lukestephenson
Copy link
Author

@He-Pin After your suggestion, I gave Kotlin flow a go.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Long> = flow {
    for (i in 1..10_000_000) {
        emit(i.toLong()) // emit next value
    }
}

fun main() = runBlocking {
    Thread.sleep(10000)
    for (i in 1..10) {
        runOnce()
    }
    Thread.sleep(10000)
}
fun runOnce() = runBlocking<Unit> {

    val start = System.currentTimeMillis()
    try {
        val sum = simple().reduce {a,b -> a+ b}

        val end = System.currentTimeMillis()
        println("sum is " + sum)

        println("took " + (end - start) + "ms")
    } catch (e: Throwable) {
        println("Caught $e")
    }
}

Again, not an overly scientific test, but it averaged 120ms (and also low memory usage like Monix). A little slower than Monix, but much faster than fs2 / zio-streams. It would be a huge shift to move from Scala to Kotlin though so I haven't .

@diesalbla I want to make use of CE and ZIO, with Akka/Pekko, apache/incubator-pekko#628 , do you think this is a reasonable way?

I think this is reasonable. While the Akka / Pekko API isn't as nice, if you can't guarantee / work with large chunks it will probably perform better.

Another thing I am thinking: Once JDK 21 is released, different fiber can just block to waiting, the stream can be simpler to build ontop.

If I find the time, I'll try to take a look at ox which is based around these ideas. Still looks very experimental though.
https://github.com/softwaremill/ox#channels-basics

Going back to your original problem, though, it seems to me that the complexity there is that you have the splitting (between database and Kafka topics) all done per-chunk. Perhaps it would help to do the splitting at the level of streams, so that the initial stream breaks out in two streams (DB or Kafka), such that the consumer of each one can do its own aggregation and chunking and batching.

I agree this suggestion would help. But I'd love it if applications being migrated from akka streams / monix didn't have to make such changes to continue performing well. This feels kinda like Ruby dev where the system design is effected by the lack of concurrency and threading resulting in more complex solutions than you would have in other languages.

@He-Pin
Copy link

He-Pin commented Sep 8, 2023

@lukestephenson Which do you plan to use, I'm always took the result of https://github.com/LesnyRumcajs/grpc_bench, as grpc always involving many stream/stages transformation, hope that works for you.

@lukestephenson
Copy link
Author

@lukestephenson Which do you plan to use, I'm always took the result of https://github.com/LesnyRumcajs/grpc_bench, as grpc always involving many stream/stages transformation, hope that works for you.

The grpc benchmarks are interesting. Each grpc call is a network call, so the CPU overhead of the streaming lib is likely to be a lot smaller compared to the grpc network overhead. Compare this to a Kafka producer where a lot of produce calls will not trigger any network IO because the producer is just adding to a in memory batch. So if the overheads of a particular streaming implementation are noticeable for a grpc benchmark, they are probably even more noticeable for Kafka publication.

@He-Pin
Copy link

He-Pin commented Sep 12, 2023

@lukestephenson I like Monix too, would you like to help maintain Monix?

@lukestephenson
Copy link
Author

@lukestephenson I like Monix too, would you like to help maintain Monix?

I don't think it benefits the Scala community supporting 4 different IO (Scala Future, Monix Task, cats-effect IO, ZIO) and 4 different streams (Akka / Pekko, Monix Observable, fs2, zio-streams). Monix was around prior to cats-effect / ZIO, but from what I can see, the later options won that battle. It isn't clear to me if cats-effect or ZIO will emerge as a more widely used solution, but I can't see Monix competing with either of them now, so I'm not convinced about the value of uplifting Monix (although I don't need Monix Task, cats-effect IO / ZIO do the job well, all I guess I'm after is the pushed based stream for better performance).

@He-Pin
Copy link

He-Pin commented Sep 13, 2023

  1. ZIO is created because Cats-effect's error channel is always a Throwable, and I have to say, ZIO is much simpler to use.
  2. Akka/Pekko Stream is the most flexible with the GraphDSL and support javadsl
  3. Monix is great, but the creator and the community may not have enough time to bring it to a further step.

Even in Java, there are different libs, RxJava Reactor-Core Munitey ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment