Skip to content

Instantly share code, notes, and snippets.

@knutwalker
Last active October 15, 2015 09:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save knutwalker/3b8a7ec0dc85852da396 to your computer and use it in GitHub Desktop.
Save knutwalker/3b8a7ec0dc85852da396 to your computer and use it in GitHub Desktop.
Benchmark several stream like implementations
Benchmark Mode Cnt Score Error Units
StreamPressureBench.bench_01_transducers avgt 5 89.049 ± 5.785 ms/op
StreamPressureBench.bench_01_transducers:·compiler.time.profiled avgt 5 59.000 ms
StreamPressureBench.bench_01_transducers:·compiler.time.total avgt 5 938.000 ms
StreamPressureBench.bench_01_transducers:·gc.alloc.rate avgt 5 3426.787 ± 224.769 MB/sec
StreamPressureBench.bench_01_transducers:·gc.churn.PS_Eden_Space avgt 5 3422.665 ± 523.829 MB/sec
StreamPressureBench.bench_01_transducers:·gc.churn.PS_Survivor_Space avgt 5 0.262 ± 0.485 MB/sec
StreamPressureBench.bench_01_transducers:·gc.count avgt 5 58.000 counts
StreamPressureBench.bench_01_transducers:·gc.time avgt 5 36.000 ms
StreamPressureBench.bench_02_scalaIterator avgt 5 102.397 ± 2.912 ms/op
StreamPressureBench.bench_02_scalaIterator:·compiler.time.profiled avgt 5 86.000 ms
StreamPressureBench.bench_02_scalaIterator:·compiler.time.total avgt 5 1020.000 ms
StreamPressureBench.bench_02_scalaIterator:·gc.alloc.rate avgt 5 2979.438 ± 85.628 MB/sec
StreamPressureBench.bench_02_scalaIterator:·gc.churn.PS_Eden_Space avgt 5 2962.232 ± 360.919 MB/sec
StreamPressureBench.bench_02_scalaIterator:·gc.churn.PS_Survivor_Space avgt 5 0.293 ± 0.225 MB/sec
StreamPressureBench.bench_02_scalaIterator:·gc.count avgt 5 71.000 counts
StreamPressureBench.bench_02_scalaIterator:·gc.time avgt 5 44.000 ms
StreamPressureBench.bench_03_rxScala avgt 5 171.112 ± 6.612 ms/op
StreamPressureBench.bench_03_rxScala:·compiler.time.profiled avgt 5 61.000 ms
StreamPressureBench.bench_03_rxScala:·compiler.time.total avgt 5 944.000 ms
StreamPressureBench.bench_03_rxScala:·gc.alloc.rate avgt 5 1783.007 ± 70.094 MB/sec
StreamPressureBench.bench_03_rxScala:·gc.churn.PS_Eden_Space avgt 5 1790.576 ± 105.628 MB/sec
StreamPressureBench.bench_03_rxScala:·gc.churn.PS_Survivor_Space avgt 5 0.286 ± 0.396 MB/sec
StreamPressureBench.bench_03_rxScala:·gc.count avgt 5 62.000 counts
StreamPressureBench.bench_03_rxScala:·gc.time avgt 5 39.000 ms
StreamPressureBench.bench_04_scalazStream avgt 5 32068.039 ± 830.288 ms/op
StreamPressureBench.bench_04_scalazStream:·compiler.time.profiled avgt 5 146.000 ms
StreamPressureBench.bench_04_scalazStream:·compiler.time.total avgt 5 2739.000 ms
StreamPressureBench.bench_04_scalazStream:·gc.alloc.rate avgt 5 2802.357 ± 72.273 MB/sec
StreamPressureBench.bench_04_scalazStream:·gc.churn.PS_Eden_Space avgt 5 2805.987 ± 94.225 MB/sec
StreamPressureBench.bench_04_scalazStream:·gc.churn.PS_Survivor_Space avgt 5 0.022 ± 0.029 MB/sec
StreamPressureBench.bench_04_scalazStream:·gc.count avgt 5 359.000 counts
StreamPressureBench.bench_04_scalazStream:·gc.time avgt 5 567.000 ms
StreamPressureBench.bench_05_akkaStreams avgt 5 39763.860 ± 2752.353 ms/op
StreamPressureBench.bench_05_akkaStreams:·compiler.time.profiled avgt 5 196.000 ms
StreamPressureBench.bench_05_akkaStreams:·compiler.time.total avgt 5 6743.000 ms
StreamPressureBench.bench_05_akkaStreams:·gc.alloc.rate avgt 5 77.600 ± 5.393 MB/sec
StreamPressureBench.bench_05_akkaStreams:·gc.churn.PS_Eden_Space avgt 5 78.413 ± 6.406 MB/sec
StreamPressureBench.bench_05_akkaStreams:·gc.churn.PS_Survivor_Space avgt 5 0.059 ± 0.028 MB/sec
StreamPressureBench.bench_05_akkaStreams:·gc.count avgt 5 511.000 counts
StreamPressureBench.bench_05_akkaStreams:·gc.time avgt 5 346.000 ms
/*
* Copyright 2014 – 2015 Paul Horn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package scalax
package transducers.benchmark
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Keep, Source, Sink }
import org.openjdk.jmh.infra.Blackhole
import org.openjdk.jmh.annotations._
import rx.lang.scala.Observable
import scalaz.concurrent.Task
import scalaz.stream.Process
import scalax.transducers._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit
@Threads(value = 1)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
class StreamPressureBench {
private[this] val source = 1 to 10000000
private[this] val tx = map((_: Int) + 1).filter(_ % 2 == 0).foreach(_ ⇒ ())
private[this] implicit val system = ActorSystem()
private[this] implicit val mat = ActorMaterializer()
private[this] val akkaSource = Source(() ⇒ source.toIterator).map(_ + 1).filter(_ % 2 == 0).toMat(Sink.ignore)(Keep.right)
private[this] val rxSource = Observable.from(source).map(_ + 1).filter(_ % 2 == 0)
private[this] val zSource = Process.emitAll(source).map(_ + 1).filter(_ % 2 == 0).drain.run[Task]
@Benchmark
def bench_01_transducers(bh: Blackhole): Unit =
bh.consume(run(tx)(source.toIterator))
@Benchmark
def bench_02_scalaIterator(bh: Blackhole): Unit =
bh.consume(source.toIterator.map(_ + 1).filter(_ % 2 == 0).foreach(_ ⇒ ()))
@Benchmark
def bench_03_rxScala(bh: Blackhole): Unit =
bh.consume(rxSource.foreach(_ ⇒ ()))
@Benchmark
def bench_04_scalazStream(bh: Blackhole): Unit =
bh.consume(zSource.run)
@Benchmark
def bench_05_akkaStreams(bh: Blackhole): Unit =
bh.consume(Await.result(akkaSource.run(), Duration.Inf))
@TearDown
def shutdown(): Unit =
system.shutdown()
}
@arBmind
Copy link

arBmind commented Oct 15, 2015

Are you sure the measurements have performed any relevant operations?
rxScala - I fear without a subscription no events are propagated.

My results for rxCpp: (on a 2,4 GHz MacBook Pro Retina Late 2013)

  • MSVC2015 ~400 ms/op
  • MinGW 4.9.2 ~80 ms/op

The benchmark in C++ using Google Benchmark library and rxCpp:

#include "benchmark/benchmark.h"
#include "rxcpp/rx.hpp"

void RxCppTest1(benchmark::State& state) {
    auto os = rxcpp::observable<>::range(1, 10000000);
    while (state.KeepRunning()) {
        os.map([](auto x) {
            return x + 1;
        }).filter([](auto x) {
            return x % 2 == 0;
        }).subscribe([](auto x) {
            benchmark::DoNotOptimize(x);
        });
    }
}
BENCHMARK(RxCppTest1);
BENCHMARK_MAIN()

@arBmind
Copy link

arBmind commented Oct 15, 2015

For reference I did the benchmark again with a plain C++ code:

void PlainLoop(benchmark::State& state) {
    while (state.KeepRunning()) {
        for(auto i = 1; i <= 10000000; i++) {
            auto j = i + 1;
            if (j % 2 == 0) {
                benchmark::DoNotOptimize(j);
            }
        }
    }
}
BENCHMARK(PlainLoop);

My results:

  • MSVC2015 ~20 ms/op (factor 20)
  • MinGW 4.9.2 ~5 ms/op (factor 16)

From my investigation rxCpp is slow because it makes big use of exception handling and forwarding.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment