Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Last active July 19, 2018 07:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akarnokd/62d058f71546f482b1aa to your computer and use it in GitHub Desktop.
Save akarnokd/62d058f71546f482b1aa to your computer and use it in GitHub Desktop.
package io.reactivex;
import java.util.concurrent.*;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import io.reactivex.internal.util.Pow2;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import java8.util.concurrent.*;
import java8.util.concurrent.Flow.Subscription;
@BenchmarkMode(Mode.Throughput)
@Warmup(iterations = 5)
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@State(Scope.Thread)
public class FlowComparison {
@Param({ "1", "1000", "1000000" })
public int times;
Observable<Integer> rxRange;
ExecutorService exec;
int capacity;
@Setup
public void setup() {
capacity = Math.max(Pow2.roundToPowerOfTwo(times), 256);
rxRange = Observable.range(1, times).observeOn(Schedulers.single());
exec = Executors.newSingleThreadExecutor();
}
@TearDown
public void teardown() {
exec.shutdownNow();
}
@Benchmark
public void rxFirehose(Blackhole bh) throws Exception {
int s = times;
LatchedObserver<Integer> lo = new LatchedObserver<>(bh);
PublishSubject<Integer> rxSubject = PublishSubject.create();
rxSubject.observeOn(Schedulers.single(), false, capacity).subscribe(lo);
for (int i = 0; i < s; i++) {
rxSubject.onNext(i);
}
rxSubject.onComplete();
lo.latch.await();
}
@Benchmark
public void spFirehose(Blackhole bh) throws Exception {
int s = times;
FlowLatchedObserver<Integer> lo = new FlowLatchedObserver<>(bh);
SubmissionPublisher<Integer> spReceive = new SubmissionPublisher<>(exec, capacity);
spReceive.subscribe(lo);
for (int i = 0; i < s; i++) {
spReceive.submit(i);
}
spReceive.close();
lo.latch.await();
}
@Benchmark
public void rxRange(Blackhole bh) throws Exception {
LatchedObserver<Integer> lo = new LatchedObserver<>(bh);
rxRange.subscribe(lo);
lo.latch.await();
}
@Benchmark
public void spRange(Blackhole bh) throws Exception {
int s = times;
FlowLatchedObserver<Integer> lo = new FlowLatchedObserver<>(bh);
SubmissionPublisher<Integer> spRange = new SubmissionPublisher<>(exec, Observable.bufferSize());
spRange.subscribe(lo);
for (int i = 0; i < s; i++) {
spRange.submit(i);
}
spRange.close();
lo.latch.await();
}
static final class FlowLatchedObserver<T> implements Flow.Subscriber<T> {
public CountDownLatch latch = new CountDownLatch(1);
private final Blackhole bh;
public FlowLatchedObserver(Blackhole bh) {
this.bh = bh;
}
@Override
public void onComplete() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
latch.countDown();
}
@Override
public void onNext(T t) {
bh.consume(t);
}
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment