Skip to content

Instantly share code, notes, and snippets.

@benjchristensen
Last active January 31, 2017 09:45
Show Gist options
  • Save benjchristensen/8367765 to your computer and use it in GitHub Desktop.
Save benjchristensen/8367765 to your computer and use it in GitHub Desktop.
Playing with Observable.bind
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import rx.operators.SynchronizedObserver;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Action2;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
import rx.util.functions.Func3;
import rx.util.functions.FuncN;
public class Obsurvable<T> {
private final Action2<Observer<T>, OperatorSubscription> f;
Obsurvable(Action2<Observer<T>, OperatorSubscription> f) {
this.f = f;
}
public static <T> Obsurvable<T> create(final Action2<Observer<T>, OperatorSubscription> f) {
return new Obsurvable<T>(f);
}
public Subscription subscribe(Observer<T> o) {
final OperatorSubscription os = new OperatorSubscription();
// System.out.println("$$ subscribe subscription: " + os);
observe(o, os);
return os;
}
public void observe(Observer<T> o, OperatorSubscription sf) {
f.call(o, sf);
}
public static interface Operator<T> extends Observer<T> {
public OperatorSubscription getSubscription();
}
public <R> Obsurvable<R> bind(final Func2<Observer<R>, OperatorSubscription, Operator<T>> bind) {
return new Obsurvable<R>(new Action2<Observer<R>, OperatorSubscription>() {
@Override
public void call(Observer<R> o, final OperatorSubscription s) {
// subS -> take
Operator<T> ot = bind.call(o, s);
// takeS -> source
observe(ot, ot.getSubscription());
}
});
}
public <R> Obsurvable<R> map(Func1<T, R> f) {
return bind(new MapOperator<T, R>(f));
}
public <R> Obsurvable<R> flatMap(Func1<T, Obsurvable<R>> f) {
return bind(new MapOperator<T, Obsurvable<R>>(f)).bind(new MergeOperator<R>());
}
public Obsurvable<T> take(int num) {
return bind(new TakeOperator<T>(num));
}
public final <K> Obsurvable<Obsurvable<T>> groupBy(final Func1<T, K> keySelector) {
return bind(new OperatorGroupBy<K, T>(keySelector));
}
public <T2, R> Obsurvable<R> zip(final Obsurvable<T2> other, final Func2<T, T2, R> zipFunction) {
return zip(this, other, zipFunction);
}
@SuppressWarnings("unchecked")
public static <T1, T2, R> Obsurvable<R> zip(final Obsurvable<T1> t1, final Obsurvable<T2> t2, final Func2<T1, T2, R> zipFunction) {
return from(new Obsurvable[] { t1, t2 }).bind(new ZipOperator<R>(new FuncN<R>() {
@Override
public R call(Object... args) {
return zipFunction.call((T1) args[0], (T2) args[1]);
}
}));
}
@SuppressWarnings("unchecked")
public static <T1, T2, T3, R> Obsurvable<R> zip(final Obsurvable<T1> t1, final Obsurvable<T2> t2, final Obsurvable<T3> t3, final Func3<T1, T2, T3, R> zipFunction) {
return from(new Obsurvable[] { t1, t2, t3 }).bind(new ZipOperator<R>(new FuncN<R>() {
@Override
public R call(Object... args) {
return zipFunction.call((T1) args[0], (T2) args[1], (T3) args[2]);
}
}));
}
public Obsurvable<T> repeat() {
return from(this).bind(new RepeatOperator<T>());
}
public static <T> Obsurvable<T> from(final T t) {
return Obsurvable.create(new Action2<Observer<T>, OperatorSubscription>() {
@Override
public void call(Observer<T> o, OperatorSubscription s) {
o.onNext(t);
o.onCompleted();
}
});
}
public static <T> Obsurvable<T> from(final Iterable<T> is) {
return Obsurvable.create(new Action2<Observer<T>, OperatorSubscription>() {
@Override
public void call(Observer<T> o, OperatorSubscription s) {
for (T i : is) {
if (s.isUnsubscribed()) {
break;
}
o.onNext(i);
}
o.onCompleted();
}
});
}
/**************************************************************************************************************/
private static class OperatorSubscription implements Subscription {
private final CompositeSubscription cs = new CompositeSubscription();
@Override
public void unsubscribe() {
cs.unsubscribe();
}
public static OperatorSubscription create() {
return new OperatorSubscription();
}
public static OperatorSubscription create(Subscription s) {
OperatorSubscription _s = new OperatorSubscription();
_s.add(s);
return _s;
}
public boolean isUnsubscribed() {
return cs.isUnsubscribed();
}
public void add(Subscription s) {
cs.add(s);
}
}
/**************************************************************************************************************/
private final static class OperatorGroupBy<K, T> implements Func2<Observer<Obsurvable<T>>, OperatorSubscription, Operator<T>> {
final Func1<? super T, ? extends K> keySelector;
public OperatorGroupBy(final Func1<T, K> keySelector) {
this.keySelector = keySelector;
}
@Override
public Operator<T> call(final Observer<Obsurvable<T>> childObserver, final OperatorSubscription childSubscription) {
// when child unsubscribes we want to prevent further groups from being emitted but still allow existing groups
childSubscription.add(Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("********* groupBy => outer subscription unsubscribed");
}
}));
// TODO what do we do if the parentSubscription unsubscribes?
final OperatorSubscription parentSubscription = new OperatorSubscription();
parentSubscription.add(Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("********* groupBy => parentSubscription unsubscribed");
}
}));
return new Operator<T>() {
private final Map<K, SimpleSubject<T>> groups = new HashMap<K, SimpleSubject<T>>();
private final AtomicInteger completionCounter = new AtomicInteger(1);
@Override
public void onCompleted() {
// if we receive onCompleted from our parent we onComplete everything
for (SimpleSubject<T> ps : groups.values()) {
ps.onCompleted();
}
childObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
// we immediately tear everything down if we receive an error
childSubscription.unsubscribe(); // TODO is this correct? or onComplete? we don't want to propagate the error to each child
childObserver.onError(e);
}
@Override
public void onNext(T t) {
// System.out.println("received onNext: " + t);
try {
final K key = keySelector.call(t);
SimpleSubject<T> gps = groups.get(key);
if (gps == null) {
// this group doesn't exist
if (childSubscription.isUnsubscribed()) {
// we have been unsubscribed on the outer so won't send any more groups
// System.out.println("skipping group due to outer unsubscribe: " + key);
return;
}
gps = new SimpleSubject<T>();
final SimpleSubject<T> _gps = gps;
System.out.println("new group for key: " + key);
final OperatorSubscription childGroupSubscription = new OperatorSubscription();
childGroupSubscription.add(Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("********* groupBy => child group unsubscribed");
}
}));
Obsurvable<T> go = new Obsurvable<T>(new Action2<Observer<T>, OperatorSubscription>() {
@Override
public void call(final Observer<T> childObserver, final OperatorSubscription _child) {
_gps.observe(childObserver, childGroupSubscription);
}
});
groups.put(key, gps);
// System.out.println("send onNext: " + go + " to " + key);
// number of children we have running
completionCounter.incrementAndGet();
childObserver.onNext(go);
}
// we have the correct group so send value to it
gps.onNext(t);
} catch (Throwable e) {
onError(e);
}
}
@Override
public OperatorSubscription getSubscription() {
return parentSubscription;
}
private void complete() {
if (completionCounter.decrementAndGet() == 0) {
childObserver.onCompleted();
}
}
};
}
}
private static class SimpleSubject<T> extends Obsurvable<T> implements Observer<T> {
final AtomicReference<Observer<T>> theObserver;
final AtomicReference<OperatorSubscription> theSubscription;
public SimpleSubject(final AtomicReference<Observer<T>> theObserver, final AtomicReference<OperatorSubscription> theSubscription) {
super(new Action2<Observer<T>, OperatorSubscription>() {
@Override
public void call(Observer<T> o, OperatorSubscription s) {
theObserver.set(o);
}
});
this.theObserver = theObserver;
this.theSubscription = theSubscription;
}
public SimpleSubject() {
this(new AtomicReference<Observer<T>>(), new AtomicReference<OperatorSubscription>());
}
@Override
public void onCompleted() {
theObserver.get().onCompleted();
}
@Override
public void onError(Throwable e) {
theObserver.get().onError(e);
}
@Override
public void onNext(T args) {
theObserver.get().onNext(args);
}
}
public static class MergeOperator<T> implements Func2<Observer<T>, OperatorSubscription, Operator<Obsurvable<T>>> {
private final int maxConcurrent;
public MergeOperator() {
maxConcurrent = Integer.MAX_VALUE;
}
public MergeOperator(int maxConcurrent) {
if (maxConcurrent <= 0) {
throw new IllegalArgumentException("maxConcurrent must be positive");
}
this.maxConcurrent = maxConcurrent;
}
@Override
public Operator<Obsurvable<T>> call(Observer<T> _o, final OperatorSubscription os) {
final AtomicInteger completionCounter = new AtomicInteger(1);
final AtomicInteger concurrentCounter = new AtomicInteger(1);
// Concurrent* since we'll be accessing them from the inner Observers which can be on other threads
final ConcurrentLinkedQueue<Obsurvable<T>> pending = new ConcurrentLinkedQueue<Obsurvable<T>>();
final Observer<T> o = new SynchronizedObserver<T>(_o);
return new Operator<Obsurvable<T>>() {
@Override
public OperatorSubscription getSubscription() {
return os;
};
@Override
public void onCompleted() {
complete();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(Obsurvable<T> innerObservable) {
// track so we send onComplete only when all have finished
completionCounter.incrementAndGet();
// check concurrency
if (concurrentCounter.incrementAndGet() > maxConcurrent) {
pending.add(innerObservable);
concurrentCounter.decrementAndGet();
} else {
// we are able to proceed
OperatorSubscription innerSubscription = new OperatorSubscription();
os.add(innerSubscription);
innerObservable.observe(new InnerObserver(), innerSubscription);
}
}
private void complete() {
if (completionCounter.decrementAndGet() == 0) {
o.onCompleted();
return;
} else {
// not all are completed and some may still need to run
concurrentCounter.decrementAndGet();
}
// do work-stealing on whatever thread we're on and subscribe to pending observables
if (concurrentCounter.incrementAndGet() > maxConcurrent) {
// still not space to run
concurrentCounter.decrementAndGet();
} else {
// we can run
Obsurvable<T> outstandingObservable = pending.poll();
if (outstandingObservable != null) {
OperatorSubscription innerSubscription = new OperatorSubscription();
os.add(innerSubscription);
outstandingObservable.observe(new InnerObserver(), innerSubscription);
}
}
}
final class InnerObserver implements Observer<T> {
@Override
public void onCompleted() {
complete();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T a) {
o.onNext(a);
}
}
};
}
}
private static class MapOperator<T, R> implements Func2<Observer<R>, OperatorSubscription, Operator<T>> {
final Func1<T, R> transformer;
MapOperator(Func1<T, R> transformer) {
this.transformer = transformer;
}
@Override
public Operator<T> call(final Observer<R> o, final OperatorSubscription s) {
// System.out.println("$$ map subscription: " + s + " unsubscribed: " + s.isUnsubscribed());
return new Operator<T>() {
public OperatorSubscription getSubscription() {
return s;
}
@Override
public void onCompleted() {
// System.out.println("onCompleted operator: " + this);
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
// System.out.println("MAP onNext: " + t + " becomes " + transformer.call(t) + " unsubscribed: " + s.isUnsubscribed());
o.onNext(transformer.call(t));
}
};
}
}
private static class TakeOperator<T> implements Func2<Observer<T>, OperatorSubscription, Operator<T>> {
final int limit;
TakeOperator(int limit) {
this.limit = limit;
}
@Override
public Operator<T> call(final Observer<T> o, final OperatorSubscription s) {
if (limit == 0) {
o.onCompleted();
s.unsubscribe();
}
final OperatorSubscription is = OperatorSubscription.create();
return new Operator<T>() {
int count = 0;
public OperatorSubscription getSubscription() {
OperatorSubscription toReturn = is;
return toReturn;
}
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T i) {
if (!is.isUnsubscribed()) {
o.onNext(i);
if (++count >= limit) {
o.onCompleted();
// System.out.println("-------------------------------------- TAKE unsubscribe: " + s);
is.unsubscribe();
}
} else {
// System.out.println("-------------------------------------- TAKE REJECTED: " + s);
}
}
};
}
}
private static class RepeatOperator<T> implements Func2<Observer<T>, OperatorSubscription, Operator<Obsurvable<T>>> {
RepeatOperator() {
}
@Override
public Operator<Obsurvable<T>> call(final Observer<T> o, final OperatorSubscription s) {
return new Operator<Obsurvable<T>>() {
public OperatorSubscription getSubscription() {
return s;
}
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(Obsurvable<T> ot) {
while (!s.isUnsubscribed()) {
ot.f.call(o, s);
}
}
};
}
}
private static class ZipOperator<R> implements Func2<Observer<R>, OperatorSubscription, Operator<Obsurvable[]>> {
final FuncN<R> zipFunction;
public ZipOperator(FuncN<R> f) {
this.zipFunction = f;
}
@Override
public Operator<Obsurvable[]> call(final Observer<R> observer, final OperatorSubscription outerSubscription) {
return new Operator<Obsurvable[]>() {
public OperatorSubscription getSubscription() {
return outerSubscription;
}
@Override
public void onCompleted() {
// we only complete once a child Obsurvable completes or errors
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void onNext(Obsurvable[] innerObsurvable) {
new Zip<R>(innerObsurvable, observer, outerSubscription, zipFunction).zip();
}
};
}
private static class Zip<R> {
@SuppressWarnings("rawtypes")
final Obsurvable[] os;
final Object[] observers;
final Observer<R> observer;
final OperatorSubscription outerSubscription;
final FuncN<R> zipFunction;
final OperatorSubscription childSubscription = new OperatorSubscription();
@SuppressWarnings("rawtypes")
public Zip(Obsurvable[] os, final Observer<R> observer, final OperatorSubscription outerSubscription, FuncN<R> zipFunction) {
this.os = os;
this.observer = observer;
this.outerSubscription = outerSubscription;
this.zipFunction = zipFunction;
observers = new Object[os.length];
for (int i = 0; i < os.length; i++) {
observers[i] = new InnerObserver();
}
outerSubscription.add(childSubscription);
}
@SuppressWarnings("unchecked")
public void zip() {
for (int i = 0; i < os.length; i++) {
observers[i] = new InnerObserver();
os[i].observe((InnerObserver) observers[i], childSubscription);
}
}
final AtomicLong counter = new AtomicLong(0);
/**
* check if we have values for each and emit if we do
*
* This will only allow one thread at a time to do the work, but ensures via `counter` increment/decrement
* that there is always once who acts on each `tick`. Same concept as used in OperationObserveOn.
*
*/
void tick() {
if (counter.getAndIncrement() == 0) {
outerloop: do {
Object[] vs = new Object[observers.length];
for (int i = 0; i < observers.length; i++) {
vs[i] = ((InnerObserver) observers[i]).items.peek();
if (vs[i] instanceof Notification) {
observer.onCompleted();
// we need to unsubscribe from all children since children are independently subscribed
childSubscription.unsubscribe();
continue outerloop;
}
if (vs[i] == null) {
continue outerloop;
}
}
// all have something so emit
observer.onNext(zipFunction.call(vs));
// now remove them
for (int i = 0; i < observers.length; i++) {
((InnerObserver) observers[i]).items.poll();
}
} while (counter.decrementAndGet() > 0);
}
}
// used to observe each obsurvable we are zipping together
// it collects all items in an internal queue
final class InnerObserver implements Observer {
// Concurrent* since we need to read it from across threads
final ConcurrentLinkedQueue items = new ConcurrentLinkedQueue();
@Override
public void onCompleted() {
items.add(new Notification());
tick();
}
@Override
public void onError(Throwable e) {
// emit error and shut down
observer.onError(e);
}
@Override
public void onNext(Object t) {
items.add(t);
tick();
}
};
}
}
/**************************************************************************************************************/
public static class UnitTest {
@Test
public void testBindUnsubscribe() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger received = new AtomicInteger();
OBSERVABLE_OF_5_INTEGERS(counter).bind(new Func2<Observer<Integer>, OperatorSubscription, Operator<Integer>>() {
@Override
public Operator<Integer> call(final Observer<Integer> childObserver, OperatorSubscription childSubscription) {
final OperatorSubscription parentSub = new OperatorSubscription();
// we return an Operator to the "parent"
return new Operator<Integer>() {
@Override
public void onCompleted() {
childObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
childObserver.onError(e);
}
@Override
public void onNext(Integer t) {
childObserver.onNext(t);
parentSub.unsubscribe();
}
@Override
public OperatorSubscription getSubscription() {
return parentSub;
}
};
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
received.incrementAndGet();
System.out.println("Received: " + i);
}
});
assertEquals(1, counter.get());
assertEquals(1, received.get());
}
@Test
public void testBindUnsubscribeLayered() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger received = new AtomicInteger();
OBSERVABLE_OF_5_INTEGERS(counter)
.bind(new DebugTakeFunction<Integer>(2, "A")).bind(new DebugTakeFunction<Integer>(1, "B")).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
received.incrementAndGet();
System.out.println("Received: " + i);
}
});
assertEquals(1, counter.get());
assertEquals(1, received.get());
}
private static class DebugTakeFunction<T> implements Func2<Observer<T>, OperatorSubscription, Operator<T>> {
final int num;
final String id;
DebugTakeFunction(final int num, final String id) {
this.num = num;
this.id = id;
}
int count = 0;
@Override
public Operator<T> call(final Observer<T> childObserver, final OperatorSubscription childSubscription) {
final OperatorSubscription parentSub = new OperatorSubscription();
// when the child unsubscribes we want it to trigger the parent unsubscribe so we link them
childSubscription.add(parentSub);
// we return an Operator to the "parent"
return new Operator<T>() {
@Override
public void onCompleted() {
childObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
childObserver.onError(e);
}
@Override
public void onNext(T t) {
count++;
System.out.println("simpleTake[" + id + "] => onNext " + count + " parentSub: " + parentSub.isUnsubscribed() + " childSub: " + childSubscription.isUnsubscribed());
if (!parentSub.isUnsubscribed()) {
childObserver.onNext(t);
if (count >= num) {
System.out.println("simpleTake[" + id + "] => unsubscribe");
parentSub.unsubscribe();
}
}
}
@Override
public OperatorSubscription getSubscription() {
return parentSub;
}
};
}
}
@Test
public void testBindUnsubscribeNested() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger received = new AtomicInteger();
final AtomicInteger childCounter = new AtomicInteger();
OBSERVABLE_OF_5_INTEGERS(counter)
.bind(new DebugTakeFunction<Integer>(4, "A"))
.bind(new DebugTakeFunction<Integer>(3, "B"))
.bind(new Func2<Observer<Obsurvable<String>>, OperatorSubscription, Operator<Integer>>() {
@Override
public Operator<Integer> call(final Observer<Obsurvable<String>> childObserver, final OperatorSubscription childSubscription) {
// when a child unsubscribes to this outer parent we will just filter out anything further while the nested Obsurvables continue
final OperatorSubscription outerSubscription = new OperatorSubscription();
childSubscription.add(outerSubscription);
return new Operator<Integer>() {
@Override
public void onCompleted() {
// if the parent completes we complete and will send nothing further
childObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
childObserver.onError(e);
}
@Override
public void onNext(Integer i) {
if (outerSubscription.isUnsubscribed()) {
System.out.println("***** group onNext => outerSubscription unsubscribed so skipping onNext of group");
} else {
childObserver.onNext(OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(childCounter, String.valueOf(i)));
}
}
@Override
public OperatorSubscription getSubscription() {
return outerSubscription;
}
};
}
})
.bind(new DebugTakeFunction<Obsurvable<String>>(2, "C"))
// flatten
.bind(new Func2<Observer<String>, OperatorSubscription, Operator<Obsurvable<String>>>() {
@Override
public Operator<Obsurvable<String>> call(final Observer<String> childObserver, final OperatorSubscription childSubscription) {
final OperatorSubscription parentGivingUsGroups = new OperatorSubscription();
final CompositeSubscription childGroups = new CompositeSubscription();
// if child unsubscribes as we are giving it flattened results then we shut everything down
childSubscription.add(Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("***** flattening got unsubscribed so shut down all groups");
childGroups.unsubscribe();
}
}));
return new Operator<Obsurvable<String>>() {
@Override
public void onCompleted() {
// if the parent completes we don't immediately as sub-groups can still be emitting
}
@Override
public void onError(Throwable e) {
// if an error occurs we immediately shut everything down
childObserver.onError(e);
// unsubscribe all child groups
childGroups.unsubscribe();
}
@Override
public void onNext(Obsurvable<String> o) {
// subscribe to this so we can flatten it
final OperatorSubscription childGroupSubscription = new OperatorSubscription();
childGroups.add(childGroupSubscription);
// if all child groups are unsubscribed we want to unsubscribe the parent
childGroupSubscription.add(Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("***** single group got unsubscribed");
}
}));
o.observe(new Observer<String>() {
@Override
public void onCompleted() {
// remove the subscription if we finish
childGroups.remove(childGroupSubscription);
// TODO we are ignoring when all child groups complete and parent is complete
}
@Override
public void onError(Throwable e) {
// remove the subscription if we finish
childGroups.remove(childGroupSubscription);
}
@Override
public void onNext(String args) {
// emit flattened results
childObserver.onNext(args);
}
}, childGroupSubscription);
}
@Override
public OperatorSubscription getSubscription() {
return parentGivingUsGroups;
}
};
}
})
.bind(new DebugTakeFunction<String>(7, "D"))
.subscribe(new Action1<String>() {
@Override
public void call(String i) {
received.incrementAndGet();
System.out.println("Received: " + i);
}
});
assertEquals(2, counter.get());
assertEquals(7, received.get());
assertEquals(7, childCounter.get());
}
@Test
public void testZip() {
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS
.zip(OBSERVABLE_OF_5_INTEGERS, new Func2<Integer, Integer, String>() {
@Override
public String call(Integer a, Integer b) {
return a + "-" + b;
}
});
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(5, list.size());
assertEquals("1-1", list.get(0));
assertEquals("2-2", list.get(1));
assertEquals("5-5", list.get(4));
}
@Test
public void testZipAsync() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch infiniteObservables = new CountDownLatch(2);
Obsurvable<String> os = ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables)
.zip(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables), new Func2<Integer, Integer, String>() {
@Override
public String call(Integer a, Integer b) {
return a + "-" + b;
}
}).take(5);
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
latch.countDown();
}
@Override
public void onNext(String s) {
System.out.println(s);
list.add(s);
}
});
latch.await(2000, TimeUnit.MILLISECONDS);
if (!infiniteObservables.await(2000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("didn't unsubscribe");
}
assertEquals(5, list.size());
assertEquals("1-1", list.get(0));
assertEquals("2-2", list.get(1));
assertEquals("5-5", list.get(4));
}
@Test
public void testZipInfiniteAndFinite() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch infiniteObservable = new CountDownLatch(1);
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS
.zip(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservable), new Func2<Integer, Integer, String>() {
@Override
public String call(Integer a, Integer b) {
return a + "-" + b;
}
});
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
latch.countDown();
}
@Override
public void onNext(String s) {
System.out.println(s);
list.add(s);
}
});
latch.await(1000, TimeUnit.MILLISECONDS);
if (!infiniteObservable.await(2000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("didn't unsubscribe");
}
assertEquals(5, list.size());
assertEquals("1-1", list.get(0));
assertEquals("2-2", list.get(1));
assertEquals("5-5", list.get(4));
}
@Test
public void testFlatMap() {
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS.flatMap(new Func1<Integer, Obsurvable<String>>() {
@Override
public Obsurvable<String> call(Integer i) {
return Obsurvable.from(Arrays.asList("a-" + i, "b-" + i));
}
});
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(10, list.size());
assertEquals("a-1", list.get(0));
assertEquals("b-1", list.get(1));
assertEquals("a-2", list.get(2));
assertEquals("b-2", list.get(3));
assertEquals("a-5", list.get(8));
assertEquals("b-5", list.get(9));
}
@Test
public void testFlatMapWithTake() {
final AtomicInteger counter = new AtomicInteger();
Obsurvable<Integer> os = Obsurvable.create(new Action2<Observer<Integer>, OperatorSubscription>() {
@Override
public void call(Observer<Integer> o, OperatorSubscription s) {
for (int i = 1; i <= 5; i++) {
if (s.isUnsubscribed()) {
break;
}
counter.incrementAndGet();
o.onNext(i);
}
o.onCompleted();
}
});
Obsurvable<String> ss = os.flatMap(new Func1<Integer, Obsurvable<String>>() {
@Override
public Obsurvable<String> call(Integer i) {
return Obsurvable.from(Arrays.asList("a-" + i, "b-" + i));
}
}).take(4);
final ArrayList<String> list = new ArrayList<String>();
ss.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(4, list.size());
assertEquals("a-1", list.get(0));
assertEquals("b-1", list.get(1));
assertEquals("a-2", list.get(2));
assertEquals("b-2", list.get(3));
assertEquals(2, counter.get());
}
@Test
public void testMerge() {
Obsurvable<Obsurvable<Integer>> os = Obsurvable.create(new Action2<Observer<Obsurvable<Integer>>, OperatorSubscription>() {
@Override
public void call(Observer<Obsurvable<Integer>> o, OperatorSubscription s) {
o.onNext(OBSERVABLE_OF_5_INTEGERS);
o.onNext(OBSERVABLE_OF_5_INTEGERS);
}
});
Obsurvable<Integer> is = os.bind(new MergeOperator<Integer>());
final ArrayList<Integer> list = new ArrayList<Integer>();
is.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(10, list.size());
assertEquals(1, list.get(0).intValue());
assertEquals(5, list.get(4).intValue());
assertEquals(1, list.get(5).intValue());
assertEquals(5, list.get(9).intValue());
}
@Test
public void testMergeTwoInfiniteSynchronousSequences() {
Obsurvable<Obsurvable<Integer>> os = Obsurvable.create(new Action2<Observer<Obsurvable<Integer>>, OperatorSubscription>() {
@Override
public void call(Observer<Obsurvable<Integer>> o, OperatorSubscription s) {
o.onNext(OBSERVABLE_OF_INFINITE_INTEGERS);
o.onNext(OBSERVABLE_OF_INFINITE_INTEGERS);
}
});
Obsurvable<Integer> is = os.bind(new MergeOperator<Integer>()).take(10);
final ArrayList<Integer> list = new ArrayList<Integer>();
is.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(10, list.size());
// this will actually only get values from the first one as it is infinite and synchronous
assertEquals(1, list.get(0).intValue());
assertEquals(5, list.get(4).intValue());
assertEquals(10, list.get(9).intValue());
}
@Test
public void testMergeFiniteAndInfiniteSynchronousSequences() {
Obsurvable<Obsurvable<Integer>> os = Obsurvable.create(new Action2<Observer<Obsurvable<Integer>>, OperatorSubscription>() {
@Override
public void call(Observer<Obsurvable<Integer>> o, OperatorSubscription s) {
o.onNext(OBSERVABLE_OF_5_INTEGERS);
o.onNext(OBSERVABLE_OF_INFINITE_INTEGERS);
}
});
Obsurvable<Integer> is = os.bind(new MergeOperator<Integer>()).take(10);
final ArrayList<Integer> list = new ArrayList<Integer>();
is.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(10, list.size());
// this will actually only get values from the first one as it is infinite and synchronous
assertEquals(1, list.get(0).intValue());
assertEquals(5, list.get(4).intValue());
assertEquals(5, list.get(9).intValue());
}
@Test
public void testMergeInifiteAsync() throws InterruptedException {
final CountDownLatch infiniteObservables = new CountDownLatch(2);
Obsurvable<Obsurvable<Integer>> os = Obsurvable.create(new Action2<Observer<Obsurvable<Integer>>, OperatorSubscription>() {
@Override
public void call(Observer<Obsurvable<Integer>> o, OperatorSubscription s) {
o.onNext(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables));
o.onNext(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables));
}
});
Obsurvable<Integer> is = os.bind(new MergeOperator<Integer>()).take(100);
final CountDownLatch latch = new CountDownLatch(1);
final ArrayList<Integer> list = new ArrayList<Integer>();
is.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer i) {
System.out.println(i);
list.add(i);
}
});
latch.await();
if (!infiniteObservables.await(2000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("didn't unsubscribe");
}
assertEquals(100, list.size());
}
@Test
public void testTake1from5() {
Obsurvable<Integer> os = OBSERVABLE_OF_5_INTEGERS.take(1);
final ArrayList<Integer> list = new ArrayList<Integer>();
os.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(1, list.size());
assertEquals(1, list.get(0).intValue());
}
@Test
public void testBindTakeAndMap() {
Obsurvable<String> os = OBSERVABLE_OF_INFINITE_INTEGERS.bind(TAKE_5).bind(TAKE_5).bind(TAKE_5).bind(TAKE_5).bind(MAP_INTEGER_TO_STRING);
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(5, list.size());
assertEquals("Number=>1", list.get(0));
assertEquals("Number=>2", list.get(1));
assertEquals("Number=>3", list.get(2));
assertEquals("Number=>4", list.get(3));
assertEquals("Number=>5", list.get(4));
}
@Test
public void testBindTakeAndMapMultipleSubscribes() {
Obsurvable<String> os = OBSERVABLE_OF_INFINITE_INTEGERS.bind(TAKE_5).bind(MAP_INTEGER_TO_STRING);
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(5, list.size());
assertEquals("Number=>1", list.get(0));
assertEquals("Number=>2", list.get(1));
assertEquals("Number=>3", list.get(2));
assertEquals("Number=>4", list.get(3));
assertEquals("Number=>5", list.get(4));
// assert we can subscribe to it again and the subscription is correct
final ArrayList<String> list2 = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String i) {
System.out.println(i);
list2.add(i);
}
});
assertEquals(5, list2.size());
assertEquals("Number=>1", list2.get(0));
assertEquals("Number=>2", list2.get(1));
assertEquals("Number=>3", list2.get(2));
assertEquals("Number=>4", list2.get(3));
assertEquals("Number=>5", list2.get(4));
}
@Test
public void testBindMap() {
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS.bind(MAP_INTEGER_TO_STRING);
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(5, list.size());
assertEquals("Number=>1", list.get(0));
}
@Test
public void testMapAndTake() {
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS.bind(MAP_INTEGER_TO_STRING).take(2);
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(2, list.size());
assertEquals("Number=>1", list.get(0));
}
@Test
public void testUnSubscribeOfEachOperator() {
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS.bind(new Func2<Observer<String>, OperatorSubscription, Operator<Integer>>() {
@Override
public Operator<Integer> call(final Observer<String> os, final OperatorSubscription s) {
s.add(Subscriptions.create(new Action0() {
@Override
public void call() {
unsubscribed.set(true);
}
}));
return new Operator<Integer>() {
public OperatorSubscription getSubscription() {
return s;
}
@Override
public void onCompleted() {
os.onCompleted();
}
@Override
public void onError(Throwable e) {
os.onError(e);
}
@Override
public void onNext(Integer i) {
os.onNext(String.valueOf(i));
}
};
}
}).take(3).map(new Func1<String, String>() {
@Override
public String call(String t) {
return "mapped-" + t;
}
}).take(2);
final AtomicReference<String> valueA = new AtomicReference<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(" received: " + s);
valueA.set(s);
}
});
assertEquals("mapped-2", valueA.get());
assertTrue(unsubscribed.get());
// reset to subscribe again
unsubscribed.set(false);
final AtomicReference<String> valueB = new AtomicReference<String>();
os.map(new Func1<String, String>() {
@Override
public String call(String t) {
return t + "-B";
}
}).take(1).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(" received: " + s);
valueB.set(s);
}
});
assertEquals("mapped-1-B", valueB.get());
assertTrue(unsubscribed.get());
}
@Test
public void testBindTakeFromInfinite() {
int desiredCount = 5;
Obsurvable<Integer> oi = OBSERVABLE_OF_INFINITE_INTEGERS.bind(new TakeOperator<Integer>(desiredCount));
final ArrayList<Integer> list = new ArrayList<Integer>();
oi.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(desiredCount, list.size());
}
@Test
public void testCreateAndSubscribe() {
final ArrayList<Integer> list = new ArrayList<Integer>();
OBSERVABLE_OF_5_INTEGERS.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(5, list.size());
}
@Test
public void testTakeFromAsyncInfinite() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch infiniteObservables = new CountDownLatch(1);
final ArrayList<Integer> list = new ArrayList<Integer>();
ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables).take(10).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer i) {
System.out.println(i);
list.add(i);
}
});
latch.await();
if (!infiniteObservables.await(2000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("didn't unsubscribe");
}
assertEquals(10, list.size());
}
@Test
public void testMultipleSubscriptionChains() {
Obsurvable<Integer> x = OBSERVABLE_OF_INFINITE_INTEGERS;
Obsurvable<Integer> y = x.take(2);
Obsurvable<Integer> z = x.take(1);
final AtomicInteger value = new AtomicInteger();
y.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
value.set(i);
}
});
assertEquals(2, value.get());
value.set(-1); // reset
z.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
value.set(i);
}
});
assertEquals(1, value.get());
}
@Test
public void testBindRepeat() {
final ArrayList<Integer> list = new ArrayList<Integer>();
Obsurvable.from(OBSERVABLE_OF_5_INTEGERS)
.bind(new RepeatOperator<Integer>())
.take(25)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
list.add(i);
}
});
assertEquals(25, list.size());
}
@Test
public void testRepeat() {
final ArrayList<Integer> list = new ArrayList<Integer>();
OBSERVABLE_OF_5_INTEGERS
.repeat()
.take(25)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
list.add(i);
}
});
assertEquals(25, list.size());
}
final Func1<String, Integer> length = new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.length();
}
};
@Test
public void testGroupBy() {
Obsurvable<String> source = Obsurvable.from(Arrays.asList("one", "two", "three", "four", "five", "six"));
Obsurvable<Obsurvable<String>> grouped = source.bind(new OperatorGroupBy<Integer, String>(length));
final String[][] content = new String[3][3];
grouped.subscribe(new Action1<Obsurvable<String>>() {
int c = 0;
@Override
public void call(Obsurvable<String> o) {
o.subscribe(new Action1<String>() {
int i = 0;
final int key = c++;
@Override
public void call(String v) {
System.out.println("key: " + key + " v: " + v);
content[key][i++] = v;
}
});
}
});
System.out.println(Arrays.asList(content[0]));
System.out.println(Arrays.asList(content[1]));
System.out.println(Arrays.asList(content[2]));
assertArrayEquals(Arrays.asList("one", "two", "six").toArray(), content[0]);
assertArrayEquals(Arrays.asList("three", null, null).toArray(), content[1]);
assertArrayEquals(Arrays.asList("four", "five", null).toArray(), content[2]);
}
private static class Event {
int source;
String message;
@Override
public String toString() {
return "Event => source: " + source + " message: " + message;
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testGroupByUnsubscribe() throws InterruptedException {
final AtomicInteger eventCounter = new AtomicInteger();
final AtomicInteger subscribeCounter = new AtomicInteger();
final AtomicInteger groupCounter = new AtomicInteger();
final AtomicInteger sentEventCounter = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
final int numToSend = 100;
final int numGroupsToSend = 2;
ASYNC_INFINITE_OBSERVABLE_OF_EVENT(numGroupsToSend, numToSend, subscribeCounter, sentEventCounter)
// groupBy the source (mod of groupCount)
.groupBy(new Func1<Event, Integer>() {
@Override
public Integer call(Event e) {
return e.source;
}
})
.take(1) // we want only the first group
.flatMap(new Func1<Obsurvable<Event>, Obsurvable<String>>() {
@Override
public Obsurvable<String> call(Obsurvable<Event> eventGroupedObservable) {
// how many groups we see (should be 1 because of take(1))
groupCounter.incrementAndGet();
return eventGroupedObservable
.take(20) // limit to only 20 events on this group
.map(new Func1<Event, String>() {
@Override
public String call(Event event) {
return "testUnsubscribe => Source: " + event.source + " Message: " + event.message;
}
});
}
})
.subscribe(new Observer() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
latch.countDown();
}
@Override
public void onNext(Object outputMessage) {
eventCounter.incrementAndGet();
}
});
if(!latch.await(5000, TimeUnit.MILLISECONDS)) {
fail("timed out so infinite loop likely never stopped");
}
System.out.println("@@@@@@@ Sent: " + sentEventCounter.get() + " received: " + eventCounter.get());
assertEquals(1, subscribeCounter.get());
assertEquals(1, groupCounter.get());
assertEquals(20, eventCounter.get());
// sentEvents will go until 'eventCounter' hits 20 and then unsubscribes
// which means it will also send (but ignore) at least the 19/20 events for the other group
// but likely many more as the unsubscribe races the other thread.
// Since it is an infinite loop, if it stops then the unsubscribe worked.
}
TakeOperator<Integer> TAKE_5 = new TakeOperator<Integer>(5);
MapOperator<Integer, String> MAP_INTEGER_TO_STRING = new MapOperator<Integer, String>(new Func1<Integer, String>() {
@Override
public String call(Integer i) {
return "Number=>" + i;
}
});
Obsurvable<Integer> OBSERVABLE_OF_INFINITE_INTEGERS = Obsurvable.create(new Action2<Observer<Integer>, OperatorSubscription>() {
@Override
public void call(Observer<Integer> o, OperatorSubscription s) {
int i = 1;
// System.out.println("source subscription: " + s);
while (!s.isUnsubscribed()) {
o.onNext(i++);
}
o.onCompleted();
}
});
Obsurvable<Integer> OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) {
return Obsurvable.create(new Action2<Observer<Integer>, OperatorSubscription>() {
@Override
public void call(Observer<Integer> o, OperatorSubscription s) {
// System.out.println("$$ OBSERVABLE_OF_5_INTEGERS source subscription: " + s);
for (int i = 1; i <= 5; i++) {
if (s.isUnsubscribed()) {
break;
}
// System.out.println(i);
numEmitted.incrementAndGet();
o.onNext(i);
}
o.onCompleted();
}
});
};
Obsurvable<Integer> OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger());
Obsurvable<String> OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(final AtomicInteger numEmitted, final String v) {
return Obsurvable.create(new Action2<Observer<String>, OperatorSubscription>() {
@Override
public void call(Observer<String> o, OperatorSubscription s) {
// System.out.println("$$ OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING source subscription: " + s);
for (int i = 1; i <= 5; i++) {
if (s.isUnsubscribed()) {
break;
}
// System.out.println(i);
numEmitted.incrementAndGet();
o.onNext(v + "-" + i);
}
o.onCompleted();
}
});
};
Obsurvable<Integer> ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(final CountDownLatch latch) {
return Obsurvable.create(new Action2<Observer<Integer>, OperatorSubscription>() {
@Override
public void call(final Observer<Integer> o, final OperatorSubscription s) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("-------> subscribe to infinite sequence");
System.out.println("Starting thread: " + Thread.currentThread());
int i = 1;
while (!s.isUnsubscribed()) {
o.onNext(i++);
Thread.yield();
}
o.onCompleted();
latch.countDown();
System.out.println("Ending thread: " + Thread.currentThread());
}
});
t.start();
}
});
}
Obsurvable<Event> ASYNC_INFINITE_OBSERVABLE_OF_EVENT(final int numGroups, final int numEvents, final AtomicInteger subscribeCounter, final AtomicInteger sentEventCounter) {
return Obsurvable.create(new Action2<Observer<Event>, OperatorSubscription>() {
@Override
public void call(final Observer<Event> observer, final OperatorSubscription s) {
System.out.println("*********** source received subscription: " + s);
// System.out.println("testUnsubscribe => *** Subscribing to EventStream ***");
subscribeCounter.incrementAndGet();
new Thread(new Runnable() {
@Override
public void run() {
int i=0;
while(!s.isUnsubscribed()) {
i++;
Event e = new Event();
e.source = i % numGroups;
e.message = "Event-" + i;
observer.onNext(e);
sentEventCounter.incrementAndGet();
}
observer.onCompleted();
}
}).start();
}
});
};
Obsurvable<Integer> ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS = ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(new CountDownLatch(1));
}
/**********************************************************************/
public Subscription subscribe(final Action1<T> onNext) {
return subscribe(new Observer<T>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(T t) {
onNext.call(t);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment