Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Created January 22, 2014 23:52
Show Gist options
  • Save akarnokd/8570027 to your computer and use it in GitHub Desktop.
Save akarnokd/8570027 to your computer and use it in GitHub Desktop.
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package rxjava8;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.SerialSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Action2;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
/**
*
* @author akarnokd
*/
public class Obsurvable<T> {
private final Action2<Observer<T>, CompositeSubscription> onSubscribe;
private Obsurvable(Action2<Observer<T>, CompositeSubscription> onSubscribe) {
this.onSubscribe = onSubscribe;
}
public Subscription subscribe(Observer<T> obsurver) {
CompositeSubscription token = new CompositeSubscription();
onSubscribe.call(obsurver, token);
return token;
}
public Subscription subscribe(Observer<T> obsurver, CompositeSubscription token) {
onSubscribe.call(obsurver, token);
return token;
}
public static <T> Obsurvable<T> create(Action2<Observer<T>, CompositeSubscription> onSubscribe) {
return new Obsurvable<>(onSubscribe);
}
public <U> Obsurvable<U> bind(Func2<Observer<U>, CompositeSubscription, Observer<T>> binder) {
return new Obsurvable<>((o, t) -> onSubscribe.call(binder.call(o, t), t));
}
public <U> Obsurvable<U> select(Func1<T, U> selector) {
return bind((u, k) -> new Observer<T>() {
@Override
public void onNext(T value) {
u.onNext(selector.call(value));
}
@Override
public void onError(Throwable t) {
u.onError(t);
}
@Override
public void onCompleted() {
u.onCompleted();
}
});
}
public static <T> Obsurvable<T> from(Iterable<T> source) {
return create((o, k) -> {
for (T v : source) {
if (k.isUnsubscribed()) {
return;
}
try {
o.onNext(v);
} catch (Throwable t) {
o.onError(t);
k.unsubscribe();
}
}
if (!k.isUnsubscribed()) {
o.onCompleted();
}
});
}
public Obsurvable<T> take(int count) {
return bind((u, k) -> new Observer<T>() {
int i;
@Override
public void onNext(T value) {
if (i >= count) {
k.unsubscribe();
return;
}
u.onNext(value);
i++;
if (i == count && !k.isUnsubscribed()) {
u.onCompleted();
}
}
@Override
public void onError(Throwable t) {
u.onError(t);
}
@Override
public void onCompleted() {
i = count;
u.onCompleted();
}
});
}
public static <T> Obsurvable<T> merge(Obsurvable<Obsurvable<T>> sources) {
return create((u, k) -> {
QueueDrain qd = new QueueDrain(k);
AtomicInteger wip = new AtomicInteger(1);
k.add(sources.subscribe(new Observer<Obsurvable<T>>() {
@Override
public void onNext(Obsurvable<T> args) {
CompositeSubscription ssub = new CompositeSubscription();
k.add(ssub);
wip.incrementAndGet();
args.subscribe(new Observer<T>() {
@Override
public void onNext(T args) {
qd.enqueue(() -> u.onNext(args));
qd.tryDrain();
}
@Override
public void onError(Throwable e) {
innerError(e);
}
@Override
public void onCompleted() {
innerCompleted();
}
}, ssub);
}
void innerError(Throwable e) {
onError(e);
k.unsubscribe();
}
void innerCompleted() {
onCompleted();
}
@Override
public void onError(Throwable e) {
qd.enqueue(() -> u.onError(e));
qd.tryDrain();
}
@Override
public void onCompleted() {
if (wip.decrementAndGet() == 0) {
qd.enqueue(() -> u.onCompleted());
qd.tryDrain();
}
}
}, k));
});
}
public Obsurvable<T> subscribeOn(Scheduler scheduler) {
return create((t, k) -> {
MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
k.add(mas);
k.add(scheduler.schedule(() -> { k.remove(mas); subscribe(t, k); }));
});
}
static final class QueueDrain {
final AtomicInteger wip = new AtomicInteger();
final BlockingQueue<Action0> queue = new LinkedBlockingQueue<>();
final CompositeSubscription k;
public QueueDrain(CompositeSubscription k) {
this.k = k;
}
public void enqueue(Action0 action) {
queue.add(action);
}
public void tryDrain() {
if (wip.incrementAndGet() > 1) {
return;
}
do {
queue.poll().call();
} while (wip.decrementAndGet() > 0 && !k.isUnsubscribed());
}
}
public Obsurvable<T> observeOn(Scheduler scheduler) {
return bind((u, k) -> new Observer<T>() {
final QueueDrain qd = new QueueDrain(k);
@Override
public void onNext(T args) {
MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
k.add(mas);
qd.enqueue(() -> { k.remove(mas); u.onNext(args); });
mas.set(scheduler.schedule(qd::tryDrain));
}
@Override
public void onError(Throwable e) {
MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
k.add(mas);
qd.enqueue(() -> { k.remove(mas); u.onError(e); });
mas.set(scheduler.schedule(qd::tryDrain));
}
@Override
public void onCompleted() {
MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
k.add(mas);
qd.enqueue(() -> { k.remove(mas); u.onCompleted(); });
mas.set(scheduler.schedule(qd::tryDrain));
}
});
}
public Obsurvable<T> repeat() {
return from(Arrays.asList(this)).bind((u, k) -> new Observer<Obsurvable<T>>() {
QueueDrain qd = new QueueDrain(k);
@Override
public void onNext(Obsurvable<T> args) {
qd.enqueue(() ->
args.subscribe(new Observer<T>() {
@Override
public void onNext(T args) {
u.onNext(args);
}
@Override
public void onError(Throwable e) {
u.onError(e);
}
@Override
public void onCompleted() {
innerNext(args);
}
}, k)
);
qd.tryDrain();
}
void innerNext(Obsurvable<T> args) {
onNext(args);
}
@Override
public void onError(Throwable e) {
u.onError(e);
}
@Override
public void onCompleted() {
}
});
}
public Subscription subscribe(Action1<T> onNext) {
return subscribe(new Observer<T>() {
@Override
public void onNext(T value) {
onNext.call(value);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Done");
}
});
}
public static void main(String[] args) throws Exception {
from(Arrays.asList(1)).repeat().take(10_000_000).subscribe((a) -> { });
from(Arrays.asList(1)).subscribeOn(Schedulers.computation()).repeat().take(10_000_000).subscribe((a) -> { });
Thread.sleep(100000);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment