Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Last active Jan 3, 2016
Embed
What would you like to do?
Obsurvable changes
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Action2;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
public class Obsurvable<T> {
final Action2<Observer<T>, OperatorSubscription> f;
Obsurvable(Action2<Observer<T>, OperatorSubscription> f) {
this.f = f;
}
public static <T> Obsurvable<T> create(final Action2<Observer<T>, OperatorSubscription> f) {
return new Obsurvable<T>(f);
}
public Subscription subscribe(Observer<T> o) {
OperatorSubscription os = new OperatorSubscription();
f.call(o, os);
return os;
}
public void observe(Observer<T> o, Func0<OperatorSubscription> sf) {
f.call(o, sf.call());
}
public <R> Obsurvable<R> bind(final Func2<Observer<R>, OperatorSubscription, Observer<T>> bind) {
return new Obsurvable<R>(new Action2<Observer<R>, OperatorSubscription>() {
@Override
public void call(Observer<R> o, OperatorSubscription s) {
f.call(bind.call(o, s), s);
}
});
}
public <R> Obsurvable<R> map(Func1<T, R> f) {
return bind(new MapOperator<T, R>(f));
}
public <R> Obsurvable<R> flatMap(Func1<T, Obsurvable<R>> f) {
return bind(new MapOperator<T, Obsurvable<R>>(f)).bind(new MergeOperator<R>());
}
public Obsurvable<T> take(int num) {
return bind(new TakeOperator<T>(num));
}
public Obsurvable<T> repeat() {
return from(this).bind(new RepeatOperator<T>());
}
public static <T> Obsurvable<T> from(final T t) {
return Obsurvable.create(new Action2<Observer<T>, OperatorSubscription>() {
@Override
public void call(Observer<T> o, OperatorSubscription s) {
o.onNext(t);
if (!s.isUnsubscribed()) {
o.onCompleted();
}
}
});
}
public static <T> Obsurvable<T> from(final Iterable<T> is) {
return Obsurvable.create(new Action2<Observer<T>, OperatorSubscription>() {
@Override
public void call(Observer<T> o, OperatorSubscription s) {
for (T i : is) {
if (s.isUnsubscribed()) {
return;
}
o.onNext(i);
}
if (!s.isUnsubscribed()) {
o.onCompleted();
}
}
});
}
public <U, R> Obsurvable<R> zip(Obsurvable<U> other, Func2<T, U, R> resultSelector) {
Obsurvable<T> t = this;
return Obsurvable.create((o, s) -> {
Object guard = new Object();
Queue<T> leftQueue = new LinkedList<>();
Queue<U> rightQueue = new LinkedList<>();
AtomicBoolean leftDone = new AtomicBoolean();
AtomicBoolean rightDone = new AtomicBoolean();
OperatorSubscription tsub = new OperatorSubscription();
OperatorSubscription usub = new OperatorSubscription();
s.add(tsub);
s.add(usub);
Action0 stride = () -> {
boolean done = false;
synchronized (guard) {
while (!leftQueue.isEmpty()
&& !rightQueue.isEmpty()
&& !s.isUnsubscribed()) {
try {
o.onNext(resultSelector.call(leftQueue.poll(), rightQueue.poll()));
} catch (Throwable e) {
o.onError(e);
s.unsubscribe();
return;
}
}
if (!s.isUnsubscribed()) {
if ((leftQueue.isEmpty() && leftDone.get())
|| (rightQueue.isEmpty() && rightDone.get())) {
done = true;
o.onCompleted();
}
}
}
if (done) {
s.unsubscribe();
}
};
t.observe(new Observer<T>() {
@Override
public void onNext(T args) {
synchronized (guard) {
leftQueue.offer(args);
}
stride.call();
}
@Override
public void onError(Throwable e) {
synchronized (guard) {
o.onError(e);
}
s.unsubscribe();
}
@Override
public void onCompleted() {
synchronized (guard) {
leftDone.set(true);
}
stride.call();
}
}, () -> tsub);
other.observe(new Observer<U>() {
@Override
public void onNext(U args) {
synchronized (guard) {
rightQueue.offer(args);
}
stride.call();
}
@Override
public void onError(Throwable e) {
synchronized (guard) {
o.onError(e);
}
}
@Override
public void onCompleted() {
synchronized (guard) {
rightDone.set(true);
}
stride.call();
}
}, () -> usub);
});
}
/**************************************************************************************************************/
public static class OperatorSubscription implements Subscription {
private final CompositeSubscription cs = new CompositeSubscription();
@Override
public void unsubscribe() {
cs.unsubscribe();
}
public static OperatorSubscription create(Subscription s) {
OperatorSubscription _s = new OperatorSubscription();
_s.add(s);
return _s;
}
public boolean isUnsubscribed() {
return cs.isUnsubscribed();
}
public void add(Subscription s) {
cs.add(s);
}
}
/**************************************************************************************************************/
public static class MergeOperator<T> implements Func2<Observer<T>, OperatorSubscription, Observer<Obsurvable<T>>> {
@Override
public Observer<Obsurvable<T>> call(final Observer<T> o, final OperatorSubscription outerSubscription) {
return new Observer<Obsurvable<T>>() {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(Obsurvable<T> innerObsurvable) {
innerObsurvable.observe(new Observer<T>() {
@Override
public void onCompleted() {
synchronized (o) {
o.onCompleted();
}
}
@Override
public void onError(Throwable e) {
synchronized (o) {
o.onError(e);
}
}
@Override
public void onNext(T a) {
synchronized (o) {
o.onNext(a);
}
}
}, new Func0<OperatorSubscription>() {
@Override
public OperatorSubscription call() {
OperatorSubscription innerSubscription = new OperatorSubscription();
outerSubscription.add(innerSubscription);
return innerSubscription;
}
});
}
};
}
}
public static class MapOperator<T, R> implements Func2<Observer<R>, OperatorSubscription, Observer<T>> {
final Func1<T, R> transformer;
MapOperator(Func1<T, R> transformer) {
this.transformer = transformer;
}
@Override
public Observer<T> call(final Observer<R> o, final OperatorSubscription s) {
return new Observer<T>() {
@Override
public void onCompleted() {
// System.out.println("onCompleted operator: " + this);
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
o.onNext(transformer.call(t));
}
};
}
}
public static class TakeOperator<T> implements Func2<Observer<T>, OperatorSubscription, Observer<T>> {
final int limit;
TakeOperator(int limit) {
this.limit = limit;
}
@Override
public Observer<T> call(final Observer<T> o, final OperatorSubscription s) {
if (limit == 0) {
o.onCompleted();
s.unsubscribe();
}
return new Observer<T>() {
int count = 0;
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T i) {
if (!s.isUnsubscribed()) {
o.onNext(i);
if (++count >= limit) {
o.onCompleted();
s.unsubscribe();
}
}
}
};
}
}
public static class RepeatOperator<T> implements Func2<Observer<T>, OperatorSubscription, Observer<Obsurvable<T>>> {
RepeatOperator() {
}
@Override
public Observer<Obsurvable<T>> call(final Observer<T> o, final OperatorSubscription s) {
return new Observer<Obsurvable<T>>() {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(Obsurvable<T> ot) {
while (!s.isUnsubscribed()) {
ot.f.call(new Observer<T>() {
@Override
public void onNext(T args) {
o.onNext(args);
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onCompleted() {
// ignored
}
}, s);
}
}
};
}
}
/**************************************************************************************************************/
public static class UnitTest {
@Test
public void testFlatMap() {
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS.flatMap(new Func1<Integer, Obsurvable<String>>() {
@Override
public Obsurvable<String> call(Integer i) {
return Obsurvable.from(Arrays.asList("a-" + i, "b-" + i));
}
});
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(10, list.size());
assertEquals("a-1", list.get(0));
assertEquals("b-1", list.get(1));
assertEquals("a-2", list.get(2));
assertEquals("b-2", list.get(3));
assertEquals("a-5", list.get(8));
assertEquals("b-5", list.get(9));
}
@Test
public void testFlatMapWithTake() {
final AtomicInteger counter = new AtomicInteger();
Obsurvable<Integer> os = Obsurvable.create(new Action2<Observer<Integer>, OperatorSubscription>() {
@Override
public void call(Observer<Integer> o, OperatorSubscription s) {
for (int i = 1; i <= 5; i++) {
if (s.isUnsubscribed()) {
break;
}
counter.incrementAndGet();
o.onNext(i);
}
o.onCompleted();
}
});
Obsurvable<String> ss = os.flatMap(new Func1<Integer, Obsurvable<String>>() {
@Override
public Obsurvable<String> call(Integer i) {
return Obsurvable.from(Arrays.asList("a-" + i, "b-" + i));
}
}).take(4);
final ArrayList<String> list = new ArrayList<String>();
ss.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(4, list.size());
assertEquals("a-1", list.get(0));
assertEquals("b-1", list.get(1));
assertEquals("a-2", list.get(2));
assertEquals("b-2", list.get(3));
assertEquals(2, counter.get());
}
@Test
public void testMerge() {
Obsurvable<Obsurvable<Integer>> os = Obsurvable.create(new Action2<Observer<Obsurvable<Integer>>, OperatorSubscription>() {
@Override
public void call(Observer<Obsurvable<Integer>> o, OperatorSubscription s) {
o.onNext(OBSERVABLE_OF_5_INTEGERS);
o.onNext(OBSERVABLE_OF_5_INTEGERS);
}
});
Obsurvable<Integer> is = os.bind(new MergeOperator<Integer>());
final ArrayList<Integer> list = new ArrayList<Integer>();
is.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(10, list.size());
assertEquals(1, list.get(0).intValue());
assertEquals(5, list.get(4).intValue());
assertEquals(1, list.get(5).intValue());
assertEquals(5, list.get(9).intValue());
}
@Test
public void testMergeTwoInfiniteSynchronousSequences() {
Obsurvable<Obsurvable<Integer>> os = Obsurvable.create(new Action2<Observer<Obsurvable<Integer>>, OperatorSubscription>() {
@Override
public void call(Observer<Obsurvable<Integer>> o, OperatorSubscription s) {
o.onNext(OBSERVABLE_OF_INFINITE_INTEGERS);
o.onNext(OBSERVABLE_OF_INFINITE_INTEGERS);
}
});
Obsurvable<Integer> is = os.bind(new MergeOperator<Integer>()).take(10);
final ArrayList<Integer> list = new ArrayList<Integer>();
is.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(10, list.size());
// this will actually only get values from the first one as it is infinite and synchronous
assertEquals(1, list.get(0).intValue());
assertEquals(5, list.get(4).intValue());
assertEquals(10, list.get(9).intValue());
}
@Test
public void testMergeFiniteAndInfiniteSynchronousSequences() {
Obsurvable<Obsurvable<Integer>> os = Obsurvable.create(new Action2<Observer<Obsurvable<Integer>>, OperatorSubscription>() {
@Override
public void call(Observer<Obsurvable<Integer>> o, OperatorSubscription s) {
o.onNext(OBSERVABLE_OF_5_INTEGERS);
o.onNext(OBSERVABLE_OF_INFINITE_INTEGERS);
}
});
Obsurvable<Integer> is = os.bind(new MergeOperator<Integer>()).take(10);
final ArrayList<Integer> list = new ArrayList<Integer>();
is.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(10, list.size());
// this will actually only get values from the first one as it is infinite and synchronous
assertEquals(1, list.get(0).intValue());
assertEquals(5, list.get(4).intValue());
assertEquals(5, list.get(9).intValue());
}
@Test
public void testMergeInifiteAsync() throws InterruptedException {
Obsurvable<Obsurvable<Integer>> os = Obsurvable.create(new Action2<Observer<Obsurvable<Integer>>, OperatorSubscription>() {
@Override
public void call(Observer<Obsurvable<Integer>> o, OperatorSubscription s) {
o.onNext(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS);
o.onNext(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS);
}
});
Obsurvable<Integer> is = os.bind(new MergeOperator<Integer>()).take(100);
final CountDownLatch latch = new CountDownLatch(1);
final ArrayList<Integer> list = new ArrayList<Integer>();
is.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer i) {
System.out.println(i);
list.add(i);
}
});
latch.await();
assertEquals(100, list.size());
}
@Test
public void testBindTakeAndMap() {
Obsurvable<String> os = OBSERVABLE_OF_INFINITE_INTEGERS.bind(TAKE_5).bind(TAKE_5).bind(TAKE_5).bind(TAKE_5).bind(MAP_INTEGER_TO_STRING);
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(5, list.size());
assertEquals("Number=>1", list.get(0));
assertEquals("Number=>2", list.get(1));
assertEquals("Number=>3", list.get(2));
assertEquals("Number=>4", list.get(3));
assertEquals("Number=>5", list.get(4));
}
@Test
public void testBindTakeAndMapMultipleSubscribes() {
Obsurvable<String> os = OBSERVABLE_OF_INFINITE_INTEGERS.bind(TAKE_5).bind(MAP_INTEGER_TO_STRING);
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(5, list.size());
assertEquals("Number=>1", list.get(0));
assertEquals("Number=>2", list.get(1));
assertEquals("Number=>3", list.get(2));
assertEquals("Number=>4", list.get(3));
assertEquals("Number=>5", list.get(4));
// assert we can subscribe to it again and the subscription is correct
final ArrayList<String> list2 = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String i) {
System.out.println(i);
list2.add(i);
}
});
assertEquals(5, list2.size());
assertEquals("Number=>1", list2.get(0));
assertEquals("Number=>2", list2.get(1));
assertEquals("Number=>3", list2.get(2));
assertEquals("Number=>4", list2.get(3));
assertEquals("Number=>5", list2.get(4));
}
@Test
public void testBindMap() {
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS.bind(MAP_INTEGER_TO_STRING);
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(5, list.size());
assertEquals("Number=>1", list.get(0));
}
@Test
public void testMapAndTake() {
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS.bind(MAP_INTEGER_TO_STRING).take(2);
final ArrayList<String> list = new ArrayList<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(2, list.size());
assertEquals("Number=>1", list.get(0));
}
@Test
public void testUnSubscribeOfEachOperator() {
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS.bind(new Func2<Observer<String>, OperatorSubscription, Observer<Integer>>() {
@Override
public Observer<Integer> call(final Observer<String> os, final OperatorSubscription s) {
s.add(Subscriptions.create(new Action0() {
@Override
public void call() {
unsubscribed.set(true);
}
}));
return new Observer<Integer>() {
@Override
public void onCompleted() {
os.onCompleted();
}
@Override
public void onError(Throwable e) {
os.onError(e);
}
@Override
public void onNext(Integer i) {
os.onNext(String.valueOf(i));
}
};
}
}).take(3).map(new Func1<String, String>() {
@Override
public String call(String t) {
return "mapped-" + t;
}
}).take(2);
final AtomicReference<String> valueA = new AtomicReference<String>();
os.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("received: " + s);
valueA.set(s);
}
});
assertEquals("mapped-2", valueA.get());
assertTrue(unsubscribed.get());
// reset to subscribe again
unsubscribed.set(false);
final AtomicReference<String> valueB = new AtomicReference<String>();
os.map(new Func1<String, String>() {
@Override
public String call(String t) {
return t + "-B";
}
}).take(1).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("received: " + s);
valueB.set(s);
}
});
assertEquals("mapped-1-B", valueB.get());
assertTrue(unsubscribed.get());
}
@Test
public void testBindTake() {
int desiredCount = 5;
Obsurvable<Integer> oi = OBSERVABLE_OF_INFINITE_INTEGERS.bind(new TakeOperator<Integer>(desiredCount));
final ArrayList<Integer> list = new ArrayList<Integer>();
oi.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(desiredCount, list.size());
}
@Test
public void testCreateAndSubscribe() {
final ArrayList<Integer> list = new ArrayList<Integer>();
OBSERVABLE_OF_5_INTEGERS.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(i);
list.add(i);
}
});
assertEquals(5, list.size());
}
@Test
public void testTakeFromAsyncInfinite() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final ArrayList<Integer> list = new ArrayList<Integer>();
ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS.take(10).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer i) {
System.out.println(i);
list.add(i);
}
});
latch.await();
assertEquals(10, list.size());
}
@Test
public void testMultipleSubscriptionChains() {
Obsurvable<Integer> x = OBSERVABLE_OF_INFINITE_INTEGERS;
Obsurvable<Integer> y = x.take(2);
Obsurvable<Integer> z = x.take(1);
final AtomicInteger value = new AtomicInteger();
y.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
value.set(i);
}
});
assertEquals(2, value.get());
value.set(-1); // reset
z.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
value.set(i);
}
});
assertEquals(1, value.get());
}
@Test
public void testBindRepeat() {
final ArrayList<Integer> list = new ArrayList<Integer>();
Obsurvable.from(OBSERVABLE_OF_5_INTEGERS)
.bind(new RepeatOperator<Integer>())
.take(25)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
list.add(i);
}
});
assertEquals(25, list.size());
}
@Test
public void testRepeat() {
final ArrayList<Integer> list = new ArrayList<Integer>();
OBSERVABLE_OF_5_INTEGERS
.repeat()
.take(25)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
list.add(i);
}
});
assertEquals(25, list.size());
}
TakeOperator<Integer> TAKE_5 = new TakeOperator<Integer>(5);
MapOperator<Integer, String> MAP_INTEGER_TO_STRING = new MapOperator<Integer, String>(new Func1<Integer, String>() {
@Override
public String call(Integer i) {
return "Number=>" + i;
}
});
Obsurvable<Integer> OBSERVABLE_OF_INFINITE_INTEGERS = Obsurvable.create(new Action2<Observer<Integer>, OperatorSubscription>() {
@Override
public void call(Observer<Integer> o, OperatorSubscription s) {
int i = 1;
while (!s.isUnsubscribed()) {
o.onNext(i++);
}
o.onCompleted();
}
});
Obsurvable<Integer> OBSERVABLE_OF_5_INTEGERS = Obsurvable.create(new Action2<Observer<Integer>, OperatorSubscription>() {
@Override
public void call(Observer<Integer> o, OperatorSubscription s) {
for (int i = 1; i <= 5; i++) {
if (s.isUnsubscribed()) {
break;
}
System.out.println(i);
o.onNext(i);
}
o.onCompleted();
}
});
Obsurvable<Integer> ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS = Obsurvable.create(new Action2<Observer<Integer>, OperatorSubscription>() {
@Override
public void call(final Observer<Integer> o, final OperatorSubscription s) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("-------> subscribe to infinite sequence");
System.out.println("Starting thread: " + Thread.currentThread());
int i = 1;
while (!s.isUnsubscribed()) {
System.out.println("onNext: " + i);
o.onNext(i++);
Thread.yield();
}
o.onCompleted();
System.out.println("Ending thread: " + Thread.currentThread());
}
});
t.start();
}
});
}
/**********************************************************************/
public Subscription subscribe(final Action1<T> onNext) {
return subscribe(new Observer<T>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(T t) {
onNext.call(t);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment