Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Last active August 1, 2016 14:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save akarnokd/11169939 to your computer and use it in GitHub Desktop.
Save akarnokd/11169939 to your computer and use it in GitHub Desktop.
package rx.operators;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.observers.SerializedSubscriber;
import rx.subjects.PublishSubject;
import rx.subscriptions.CompositeSubscription;
public class OrderedMerge<T> implements OnSubscribe<T> {
private final List<? extends Observable<? extends T>> sources;
private final Comparator<? super T> comparator;
final NotificationLite<T> nl = NotificationLite.instance();
public OrderedMerge(List<? extends Observable<? extends T>> sources, Comparator<? super T> comparator) {
this.sources = sources;
this.comparator = comparator;
}
@Override
public void call(Subscriber<? super T> child) {
CompositeSubscription csub = new CompositeSubscription();
child.add(csub);
List<SourceSubscriber> sourceSubscribers = new ArrayList<SourceSubscriber>(sources.size() + 1);
Merger merger = new Merger(sourceSubscribers, csub, new SerializedSubscriber<T>(child));
for (int i = 0; i < sources.size(); i++) {
SourceSubscriber srcSub = new SourceSubscriber(merger);
csub.add(srcSub);
sourceSubscribers.add(srcSub);
}
int i = 0;
for (Observable<? extends T> source : sources) {
if (!child.isUnsubscribed()) {
source.subscribe(sourceSubscribers.get(i));
}
i++;
}
}
final class Merger {
final List<SourceSubscriber> sourceSubscribers;
final Subscriber<? super T> child;
final Subscription cancel;
final AtomicInteger wip;
public Merger(List<SourceSubscriber> sourceSubscribers, Subscription cancel, Subscriber<? super T> child) {
this.sourceSubscribers = sourceSubscribers;
this.child = child;
this.cancel = cancel;
this.wip = new AtomicInteger();
}
public void next() {
if (wip.getAndIncrement() == 0) {
outer:
do {
do {
T candidate = null;
int i = 0;
int candidateIndex = -1;
for (SourceSubscriber srcSub : sourceSubscribers) {
T o = srcSub.peek();
if (!srcSub.done && o == null) {
continue outer;
}
if (candidate == null || (o != null && comparator.compare(o, candidate) < 0)) {
candidate = o;
candidateIndex = i;
}
i++;
}
if (candidate == null) {
child.onCompleted();
} else {
sourceSubscribers.get(candidateIndex).poll();
try {
child.onNext(candidate);
} catch (Throwable e) {
error(e);
return;
}
}
} while (!child.isUnsubscribed());
} while (wip.decrementAndGet() > 0);
}
}
public void error(Throwable t) {
try {
child.onError(t);
} finally {
cancel.unsubscribe();
}
}
}
final class SourceSubscriber extends Subscriber<T> {
final Deque<T> queue = new ArrayDeque<T>();
final Merger merger;
volatile boolean done;
public SourceSubscriber(Merger merger) {
this.merger = merger;
}
public synchronized T peek() {
return queue.peek();
}
public synchronized T poll() {
return queue.poll();
}
@Override
public void onNext(T t) {
synchronized (this) {
queue.add(t);
}
merger.next();
}
@Override
public void onError(Throwable e) {
merger.error(e);
}
@Override
public void onCompleted() {
done = true;
merger.next();
}
}
public static void main(String[] args) {
PublishSubject<Integer> one = PublishSubject.create();
PublishSubject<Integer> two = PublishSubject.create();
Observable<Integer> merged = Observable.create(new OrderedMerge<Integer>(Arrays.asList(one, two), new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
}));
merged.subscribe(new Action1<Integer>() {
@Override
public void call(Integer t1) {
System.out.println(t1);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable t1) {
t1.printStackTrace();
}
}, new Action0() {
@Override
public void call() {
System.out.println("Done");
}
});
System.out.println("One: 1");
one.onNext(1);
System.out.println("One: 3");
one.onNext(3);
System.out.println("One: 5");
one.onNext(5);
System.out.println("Two: 2");
two.onNext(2);
System.out.println("Two: 4");
two.onNext(4);
System.out.println("Two: 6");
two.onNext(6);
System.out.println("One: 7!");
one.onNext(7);
one.onCompleted();
System.out.println("Two: 8");
two.onNext(8);
System.out.println("Two: 10!");
two.onNext(10);
two.onCompleted();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment