Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Created November 19, 2014 16:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akarnokd/fc1f2e1946bb39e8794a to your computer and use it in GitHub Desktop.
Save akarnokd/fc1f2e1946bb39e8794a to your computer and use it in GitHub Desktop.
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import rx.Observable;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.exceptions.MissingBackpressureException;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func1;
import rx.internal.util.RxRingBuffer;
import rx.internal.util.ScalarSynchronousObservable;
import rx.internal.util.SubscriptionIndexedRingBuffer;
/**
* Flattens a list of {@link Observable}s into one {@code Observable}, without any transformation.
* <p>
* <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
* <p>
* You can combine the items emitted by multiple {@code Observable}s so that they act like a single {@code Observable}, by using the merge operation.
*
* @param <T>
* the type of the items emitted by both the source and merged {@code Observable}s
*/
public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
/*
* benjchristensen => This class is complex and I'm not a fan of it despite writing it. I want to give some background
* as to why for anyone who wants to try and help improve it.
*
* One of my first implementations that added backpressure support (Producer.request) was fairly elegant and used a simple
* queue draining approach. It was simple to understand as all onNext were added to their queues, then a single winner
* would drain the queues, similar to observeOn. It killed the Netflix API when I canaried it. There were two problems:
* (1) performance and (2) object allocation overhead causing massive GC pressure. Remember that merge is one of the most
* used operators (mostly due to flatmap) and is therefore critical to and a limiter of performance in any application.
*
* All subsequent work on this class and the various fast-paths and branches within it have been to achieve the needed functionality
* while reducing or eliminating object allocation and keeping performance acceptable.
*
* This has meant adopting strategies such as:
*
* - ring buffers instead of growable queues
* - object pooling
* - skipping request logic when downstream does not need backpressure
* - ScalarValueQueue for optimizing synchronous single-value Observables
* - adopting data structures that use Unsafe (and gating them based on environment so non-Oracle JVMs still work)
*
* It has definitely increased the complexity and maintenance cost of this class, but the performance gains have been significant.
*
* The biggest cost of the increased complexity is concurrency bugs and reasoning through what's going on.
*
* I'd love to have contributions that improve this class, but keep in mind the performance and GC pressure.
* The benchmarks I use are in the JMH OperatorMergePerf class. GC memory pressure is tested using Java Flight Recorder
* to track object allocation.
*/
public OperatorMerge() {
this.delayErrors = false;
}
public OperatorMerge(boolean delayErrors) {
this.delayErrors = delayErrors;
}
private final boolean delayErrors;
@Override
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
return new MergeSubscriber<T>(child, delayErrors);
}
private static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> {
final NotificationLite<T> on = NotificationLite.instance();
final Subscriber<? super T> actual;
private final MergeProducer<T> mergeProducer;
private int wip;
private boolean completed;
private final boolean delayErrors;
private ConcurrentLinkedQueue<Throwable> exceptions;
private volatile SubscriptionIndexedRingBuffer<InnerSubscriber<T>> childrenSubscribers;
private RxRingBuffer scalarValueQueue = null;
/* protected by lock on MergeSubscriber instance */
private int missedEmitting = 0;
private boolean emitLock = false;
/**
* Using synchronized(this) for `emitLock` instead of ReentrantLock or AtomicInteger is faster when there is no contention.
*
* <pre> {@code
* Using ReentrantLock:
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 44185.294 1295.565 ops/s
*
* Using synchronized(this):
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 79715.981 3704.486 ops/s
*
* Still slower though than allowing concurrency:
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 149331.046 4851.290 ops/s
* } </pre>
*/
public MergeSubscriber(Subscriber<? super T> actual, boolean delayErrors) {
super(actual);
this.actual = actual;
this.mergeProducer = new MergeProducer<T>(this);
this.delayErrors = delayErrors;
// decoupled the subscription chain because we need to decouple and control backpressure
actual.add(this);
actual.setProducer(mergeProducer);
}
@Override
public void onStart() {
// we request backpressure so we can handle long-running Observables that are enqueueing, such as flatMap use cases
// we decouple the Producer chain while keeping the Subscription chain together (perf benefit) via super(actual)
request(RxRingBuffer.SIZE);
}
/*
* This is expected to be executed sequentially as per the Rx contract or it will not work.
*/
@Override
public void onNext(Observable<? extends T> t) {
if (t instanceof ScalarSynchronousObservable) {
ScalarSynchronousObservable<? extends T> t2 = (ScalarSynchronousObservable<? extends T>)t;
handleScalarSynchronousObservable(t2);
} else {
if (t == null || isUnsubscribed()) {
return;
}
synchronized (this) {
// synchronized here because `wip` can be concurrently changed by children Observables
wip++;
}
handleNewSource(t);
}
}
private void handleNewSource(Observable<? extends T> t) {
if (childrenSubscribers == null) {
// lazily create this only if we receive Observables we need to subscribe to
childrenSubscribers = new SubscriptionIndexedRingBuffer<InnerSubscriber<T>>();
add(childrenSubscribers);
}
MergeProducer<T> producerIfNeeded = null;
// if we have received a request then we need to respect it, otherwise we fast-path
if (mergeProducer.requested != Long.MAX_VALUE) {
/**
* <pre> {@code
* With this optimization:
*
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 57100.080 4686.331 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 60.875 1.622 ops/s
*
* Without this optimization:
*
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 29863.945 1858.002 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 30.516 1.087 ops/s
* } </pre>
*/
producerIfNeeded = mergeProducer;
}
InnerSubscriber<T> i = new InnerSubscriber<T>(this, producerIfNeeded);
i.sindex = childrenSubscribers.add(i);
t.unsafeSubscribe(i);
request(1);
}
private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? extends T> t) {
// fast-path for scalar, synchronous values such as Observable.from(int)
/**
* Without this optimization:
*
* <pre> {@code
* Benchmark (size) Mode Samples Score Score error Units
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 2,418,452.409 130572.665 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 5,690.456 94.958 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 takes too long
*
* With this optimization:
*
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5,475,300.198 156741.334 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 68,932.278 1311.023 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 64.405 0.611 ops/s
* } </pre>
*
*/
if (mergeProducer.requested == Long.MAX_VALUE) {
handleScalarSynchronousObservableWithoutRequestLimits(t);
} else {
handleScalarSynchronousObservableWithRequestLimits(t);
}
}
private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchronousObservable<? extends T> t) {
T value = t.get();
if (getEmitLock()) {
try {
actual.onNext(value);
return;
} finally {
if (releaseEmitLock()) {
drainQueuesIfNeeded();
}
request(1);
}
} else {
initScalarValueQueueIfNeeded();
try {
scalarValueQueue.onNext(value);
} catch (MissingBackpressureException e) {
onError(e);
}
return;
}
}
private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronousObservable<? extends T> t) {
if (getEmitLock()) {
boolean emitted = false;
try {
long r = mergeProducer.requested;
if (r > 0) {
emitted = true;
actual.onNext(t.get());
MergeProducer.REQUESTED.decrementAndGet(mergeProducer);
// we handle this Observable without ever incrementing the wip or touching other machinery so just return here
return;
}
} finally {
if (releaseEmitLock()) {
drainQueuesIfNeeded();
}
if (emitted) {
request(1);
}
}
}
// if we didn't return above we need to enqueue
// enqueue the values for later delivery
initScalarValueQueueIfNeeded();
try {
scalarValueQueue.onNext(t.get());
} catch (MissingBackpressureException e) {
onError(e);
}
}
private void initScalarValueQueueIfNeeded() {
if (scalarValueQueue == null) {
scalarValueQueue = RxRingBuffer.getSpmcInstance();
add(scalarValueQueue);
}
}
private synchronized boolean releaseEmitLock() {
emitLock = false;
if (missedEmitting == 0) {
return false;
} else {
return true;
}
}
private synchronized boolean getEmitLock() {
if (emitLock) {
missedEmitting++;
return false;
} else {
emitLock = true;
missedEmitting = 0;
return true;
}
}
private boolean drainQueuesIfNeeded() {
while (true) {
if (getEmitLock()) {
int emitted = 0;
try {
emitted = drainScalarValueQueue();
drainChildrenQueues();
} finally {
boolean moreToDrain = releaseEmitLock();
// request outside of lock
if (emitted > 0) {
request(emitted);
}
if (!moreToDrain) {
return true;
}
// otherwise we'll loop and get whatever was added
}
} else {
return false;
}
}
}
int lastDrainedIndex = 0;
/**
* ONLY call when holding the EmitLock.
*/
private void drainChildrenQueues() {
if (childrenSubscribers != null) {
lastDrainedIndex = childrenSubscribers.forEach(DRAIN_ACTION, lastDrainedIndex);
}
}
/**
* ONLY call when holding the EmitLock.
*/
private int drainScalarValueQueue() {
if (scalarValueQueue != null) {
long r = mergeProducer.requested;
int emittedWhileDraining = 0;
if (r < 0) {
// drain it all
Object o = null;
while ((o = scalarValueQueue.poll()) != null) {
on.accept(actual, o);
emittedWhileDraining++;
}
} else if (r > 0) {
// drain what was requested
long toEmit = r;
for (int i = 0; i < toEmit; i++) {
Object o = scalarValueQueue.poll();
if (o == null) {
break;
} else {
on.accept(actual, o);
emittedWhileDraining++;
}
}
// decrement the number we emitted from outstanding requests
MergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining);
}
return emittedWhileDraining;
}
return 0;
}
final Func1<InnerSubscriber<T>, Boolean> DRAIN_ACTION = new Func1<InnerSubscriber<T>, Boolean>() {
@Override
public Boolean call(InnerSubscriber<T> s) {
if (s.q != null) {
long r = mergeProducer.requested;
int emitted = 0;
emitted += s.drainQueue();
if (emitted > 0) {
/*
* `s.emitted` is not volatile (because of performance impact of making it so shown by JMH tests)
* but `emitted` can ONLY be touched by the thread holding the `emitLock` which we're currently inside.
*
* Entering and leaving the emitLock flushes all values so this is visible to us.
*/
emitted += s.emitted;
// TODO we may want to store this in s.emitted and only request if above batch
// reset this since we have requested them all
s.emitted = 0;
s.requestMore(emitted);
}
if (emitted == r) {
// we emitted as many as were requested so stop the forEach loop
return Boolean.FALSE;
}
}
return Boolean.TRUE;
}
};
@Override
public void onError(Throwable e) {
if (!completed) {
completed = true;
innerError(e, true);
}
}
private void innerError(Throwable e, boolean parent) {
if (delayErrors) {
synchronized (this) {
if (exceptions == null) {
exceptions = new ConcurrentLinkedQueue<Throwable>();
}
}
exceptions.add(e);
boolean sendOnComplete = false;
synchronized (this) {
if (!parent) {
wip--;
}
if ((wip == 0 && completed) || (wip < 0)) {
sendOnComplete = true;
}
}
if (sendOnComplete) {
drainAndComplete();
}
} else {
actual.onError(e);
}
}
@Override
public void onCompleted() {
boolean c = false;
synchronized (this) {
completed = true;
if (wip == 0 && (scalarValueQueue == null || scalarValueQueue.isEmpty())) {
c = true;
}
}
if (c) {
// complete outside of lock
drainAndComplete();
}
}
void completeInner(InnerSubscriber<T> s) {
boolean sendOnComplete = false;
synchronized (this) {
wip--;
if (wip == 0 && completed) {
sendOnComplete = true;
}
}
childrenSubscribers.remove(s.sindex);
if (sendOnComplete) {
drainAndComplete();
}
}
private void drainAndComplete() {
drainQueuesIfNeeded(); // TODO need to confirm whether this is needed or not
if (delayErrors) {
Queue<Throwable> es = null;
synchronized (this) {
es = exceptions;
}
if (es != null) {
if (es.isEmpty()) {
actual.onCompleted();
} else if (es.size() == 1) {
actual.onError(es.poll());
} else {
actual.onError(new CompositeException(es));
}
} else {
actual.onCompleted();
}
} else {
actual.onCompleted();
}
}
}
private static final class MergeProducer<T> implements Producer {
private final MergeSubscriber<T> ms;
public MergeProducer(MergeSubscriber<T> ms) {
this.ms = ms;
}
private volatile long requested = 0;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<MergeProducer> REQUESTED = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "requested");
@Override
public void request(long n) {
if (requested == Long.MAX_VALUE) {
return;
}
if (n == Long.MAX_VALUE) {
requested = Long.MAX_VALUE;
} else {
REQUESTED.getAndAdd(this, n);
if (ms.drainQueuesIfNeeded()) {
boolean sendComplete = false;
synchronized (ms) {
if (ms.wip == 0 && ms.scalarValueQueue != null && ms.scalarValueQueue.isEmpty()) {
sendComplete = true;
}
}
if (sendComplete) {
ms.drainAndComplete();
}
}
}
}
}
private static final class InnerSubscriber<T> extends Subscriber<T> {
public int sindex;
final MergeSubscriber<T> parentSubscriber;
final MergeProducer<T> producer;
/** Make sure the inner termination events are delivered only once. */
@SuppressWarnings("unused")
volatile int terminated;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");
private final RxRingBuffer q = RxRingBuffer.getSpmcInstance();
/* protected by emitLock */
int emitted = 0;
final int THRESHOLD = (int) (q.capacity() * 0.7);
public InnerSubscriber(MergeSubscriber<T> parent, MergeProducer<T> producer) {
this.parentSubscriber = parent;
this.producer = producer;
request(q.capacity());
}
@Override
public void onNext(T t) {
emit(t, false);
}
@Override
public void onError(Throwable e) {
// it doesn't go through queues, it immediately onErrors and tears everything down
if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
parentSubscriber.innerError(e, false);
}
}
@Override
public void onCompleted() {
if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
emit(null, true);
}
}
public void requestMore(long n) {
request(n);
}
private void emit(T t, boolean complete) {
boolean drain = false;
boolean enqueue = true;
/**
* This optimization to skip the queue is messy ... but it makes a big difference in performance when merging a single stream
* with many values, or many intermittent streams without contention. It doesn't make much of a difference if there is contention.
*
* Below are some of the relevant benchmarks to show the difference.
*
* <pre> {@code
* With this fast-path:
*
* Benchmark (size) Mode Samples Score Score error Units
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5344143.680 393484.592 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 83582.662 4293.755 ops/s +++
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 73.889 4.477 ops/s +++
*
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5799265.333 199205.296 ops/s +
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 62.655 2.521 ops/s +++
*
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 76925.616 4909.174 ops/s
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 3634.977 242.469 ops/s
*
* Without:
*
* Benchmark (size) Mode Samples Score Score error Units
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5099295.678 159539.842 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 18196.671 10053.298 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 19.184 1.028 ops/s
*
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5591612.719 591821.763 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 21.018 3.251 ops/s
*
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 72692.073 18395.031 ops/s
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 4379.093 386.368 ops/s
* } </pre>
*
* It looks like it may cause a slowdown in highly contended cases (like 'mergeTwoAsyncStreamsOfN' above) as instead of just
* putting in the queue, it attempts to get the lock. We are optimizing for the non-contended case.
*/
if (parentSubscriber.getEmitLock()) {
enqueue = false;
try {
// drain the queue if there is anything in it before emitting the current value
emitted += drainQueue();
// }
if (producer == null) {
// no backpressure requested
if (complete) {
parentSubscriber.completeInner(this);
} else {
try {
parentSubscriber.actual.onNext(t);
} catch (Throwable e) {
// special error handling due to complexity of merge
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
emitted++;
}
} else {
// this needs to check q.count() as draining above may not have drained the full queue
// perf tests show this to be okay, though different queue implementations could perform poorly with this
if (producer.requested > 0 && q.count() == 0) {
if (complete) {
parentSubscriber.completeInner(this);
} else {
try {
parentSubscriber.actual.onNext(t);
} catch (Throwable e) {
// special error handling due to complexity of merge
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
emitted++;
MergeProducer.REQUESTED.decrementAndGet(producer);
}
} else {
// no requests available, so enqueue it
enqueue = true;
}
}
} finally {
drain = parentSubscriber.releaseEmitLock();
}
if (emitted > THRESHOLD) {
// this is for batching requests when we're in a use case that isn't queueing, always fast-pathing the onNext
/**
* <pre> {@code
* Without this batching:
*
* Benchmark (size) Mode Samples Score Score error Units
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5060743.715 100445.513 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 36606.582 1610.582 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 38.476 0.973 ops/s
*
* With this batching:
*
* Benchmark (size) Mode Samples Score Score error Units
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5367945.738 262740.137 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 62703.930 8496.036 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 72.711 3.746 ops/s
*} </pre>
*/
request(emitted);
// we are modifying this outside of the emit lock ... but this can be considered a "lazySet"
// and it will be flushed before anything else touches it because the emitLock will be obtained
// before any other usage of it
emitted = 0;
}
}
if (enqueue) {
enqueue(t, complete);
drain = true;
}
if (drain) {
/**
* This extra check for whether to call drain is ugly, but it helps:
* <pre> {@code
* Without:
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 61.812 1.455 ops/s
*
* With:
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 78.795 1.766 ops/s
* } </pre>
*/
parentSubscriber.drainQueuesIfNeeded();
}
}
private void enqueue(T t, boolean complete) {
try {
if (complete) {
q.onCompleted();
} else {
q.onNext(t);
}
} catch (MissingBackpressureException e) {
onError(e);
}
}
private int drainRequested() {
int emitted = 0;
// drain what was requested
long toEmit = producer.requested;
Object o;
for (int i = 0; i < toEmit; i++) {
o = q.poll();
if (o == null) {
// no more items
break;
} else if (q.isCompleted(o)) {
parentSubscriber.completeInner(this);
q.unsubscribe();
} else {
try {
if (!q.accept(o, parentSubscriber.actual)) {
emitted++;
} else {
q.unsubscribe();
}
} catch (Throwable e) {
q.unsubscribe();
// special error handling due to complexity of merge
onError(OnErrorThrowable.addValueAsLastCause(e, o));
}
}
}
// decrement the number we emitted from outstanding requests
MergeProducer.REQUESTED.getAndAdd(producer, -emitted);
return emitted;
}
private int drainAll() {
int emitted = 0;
// drain it all
Object o;
while ((o = q.poll()) != null) {
if (q.isCompleted(o)) {
parentSubscriber.completeInner(this);
q.unsubscribe();
} else {
try {
if (!q.accept(o, parentSubscriber.actual)) {
emitted++;
} else {
q.unsubscribe();
}
} catch (Throwable e) {
q.unsubscribe();
// special error handling due to complexity of merge
onError(OnErrorThrowable.addValueAsLastCause(e, o));
}
}
}
return emitted;
}
private int drainQueue() {
if (producer != null) {
return drainRequested();
} else {
return drainAll();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment