Created
January 22, 2014 23:52
-
-
Save akarnokd/8570027 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* To change this license header, choose License Headers in Project Properties. | |
* To change this template file, choose Tools | Templates | |
* and open the template in the editor. | |
*/ | |
package rxjava8; | |
import java.util.Arrays; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import rx.Observer; | |
import rx.Scheduler; | |
import rx.Subscription; | |
import rx.schedulers.Schedulers; | |
import rx.subscriptions.CompositeSubscription; | |
import rx.subscriptions.MultipleAssignmentSubscription; | |
import rx.subscriptions.SerialSubscription; | |
import rx.util.functions.Action0; | |
import rx.util.functions.Action1; | |
import rx.util.functions.Action2; | |
import rx.util.functions.Func1; | |
import rx.util.functions.Func2; | |
/** | |
* | |
* @author akarnokd | |
*/ | |
public class Obsurvable<T> { | |
private final Action2<Observer<T>, CompositeSubscription> onSubscribe; | |
private Obsurvable(Action2<Observer<T>, CompositeSubscription> onSubscribe) { | |
this.onSubscribe = onSubscribe; | |
} | |
public Subscription subscribe(Observer<T> obsurver) { | |
CompositeSubscription token = new CompositeSubscription(); | |
onSubscribe.call(obsurver, token); | |
return token; | |
} | |
public Subscription subscribe(Observer<T> obsurver, CompositeSubscription token) { | |
onSubscribe.call(obsurver, token); | |
return token; | |
} | |
public static <T> Obsurvable<T> create(Action2<Observer<T>, CompositeSubscription> onSubscribe) { | |
return new Obsurvable<>(onSubscribe); | |
} | |
public <U> Obsurvable<U> bind(Func2<Observer<U>, CompositeSubscription, Observer<T>> binder) { | |
return new Obsurvable<>((o, t) -> onSubscribe.call(binder.call(o, t), t)); | |
} | |
public <U> Obsurvable<U> select(Func1<T, U> selector) { | |
return bind((u, k) -> new Observer<T>() { | |
@Override | |
public void onNext(T value) { | |
u.onNext(selector.call(value)); | |
} | |
@Override | |
public void onError(Throwable t) { | |
u.onError(t); | |
} | |
@Override | |
public void onCompleted() { | |
u.onCompleted(); | |
} | |
}); | |
} | |
public static <T> Obsurvable<T> from(Iterable<T> source) { | |
return create((o, k) -> { | |
for (T v : source) { | |
if (k.isUnsubscribed()) { | |
return; | |
} | |
try { | |
o.onNext(v); | |
} catch (Throwable t) { | |
o.onError(t); | |
k.unsubscribe(); | |
} | |
} | |
if (!k.isUnsubscribed()) { | |
o.onCompleted(); | |
} | |
}); | |
} | |
public Obsurvable<T> take(int count) { | |
return bind((u, k) -> new Observer<T>() { | |
int i; | |
@Override | |
public void onNext(T value) { | |
if (i >= count) { | |
k.unsubscribe(); | |
return; | |
} | |
u.onNext(value); | |
i++; | |
if (i == count && !k.isUnsubscribed()) { | |
u.onCompleted(); | |
} | |
} | |
@Override | |
public void onError(Throwable t) { | |
u.onError(t); | |
} | |
@Override | |
public void onCompleted() { | |
i = count; | |
u.onCompleted(); | |
} | |
}); | |
} | |
public static <T> Obsurvable<T> merge(Obsurvable<Obsurvable<T>> sources) { | |
return create((u, k) -> { | |
QueueDrain qd = new QueueDrain(k); | |
AtomicInteger wip = new AtomicInteger(1); | |
k.add(sources.subscribe(new Observer<Obsurvable<T>>() { | |
@Override | |
public void onNext(Obsurvable<T> args) { | |
CompositeSubscription ssub = new CompositeSubscription(); | |
k.add(ssub); | |
wip.incrementAndGet(); | |
args.subscribe(new Observer<T>() { | |
@Override | |
public void onNext(T args) { | |
qd.enqueue(() -> u.onNext(args)); | |
qd.tryDrain(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
innerError(e); | |
} | |
@Override | |
public void onCompleted() { | |
innerCompleted(); | |
} | |
}, ssub); | |
} | |
void innerError(Throwable e) { | |
onError(e); | |
k.unsubscribe(); | |
} | |
void innerCompleted() { | |
onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
qd.enqueue(() -> u.onError(e)); | |
qd.tryDrain(); | |
} | |
@Override | |
public void onCompleted() { | |
if (wip.decrementAndGet() == 0) { | |
qd.enqueue(() -> u.onCompleted()); | |
qd.tryDrain(); | |
} | |
} | |
}, k)); | |
}); | |
} | |
public Obsurvable<T> subscribeOn(Scheduler scheduler) { | |
return create((t, k) -> { | |
MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); | |
k.add(mas); | |
k.add(scheduler.schedule(() -> { k.remove(mas); subscribe(t, k); })); | |
}); | |
} | |
static final class QueueDrain { | |
final AtomicInteger wip = new AtomicInteger(); | |
final BlockingQueue<Action0> queue = new LinkedBlockingQueue<>(); | |
final CompositeSubscription k; | |
public QueueDrain(CompositeSubscription k) { | |
this.k = k; | |
} | |
public void enqueue(Action0 action) { | |
queue.add(action); | |
} | |
public void tryDrain() { | |
if (wip.incrementAndGet() > 1) { | |
return; | |
} | |
do { | |
queue.poll().call(); | |
} while (wip.decrementAndGet() > 0 && !k.isUnsubscribed()); | |
} | |
} | |
public Obsurvable<T> observeOn(Scheduler scheduler) { | |
return bind((u, k) -> new Observer<T>() { | |
final QueueDrain qd = new QueueDrain(k); | |
@Override | |
public void onNext(T args) { | |
MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); | |
k.add(mas); | |
qd.enqueue(() -> { k.remove(mas); u.onNext(args); }); | |
mas.set(scheduler.schedule(qd::tryDrain)); | |
} | |
@Override | |
public void onError(Throwable e) { | |
MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); | |
k.add(mas); | |
qd.enqueue(() -> { k.remove(mas); u.onError(e); }); | |
mas.set(scheduler.schedule(qd::tryDrain)); | |
} | |
@Override | |
public void onCompleted() { | |
MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); | |
k.add(mas); | |
qd.enqueue(() -> { k.remove(mas); u.onCompleted(); }); | |
mas.set(scheduler.schedule(qd::tryDrain)); | |
} | |
}); | |
} | |
public Obsurvable<T> repeat() { | |
return from(Arrays.asList(this)).bind((u, k) -> new Observer<Obsurvable<T>>() { | |
QueueDrain qd = new QueueDrain(k); | |
@Override | |
public void onNext(Obsurvable<T> args) { | |
qd.enqueue(() -> | |
args.subscribe(new Observer<T>() { | |
@Override | |
public void onNext(T args) { | |
u.onNext(args); | |
} | |
@Override | |
public void onError(Throwable e) { | |
u.onError(e); | |
} | |
@Override | |
public void onCompleted() { | |
innerNext(args); | |
} | |
}, k) | |
); | |
qd.tryDrain(); | |
} | |
void innerNext(Obsurvable<T> args) { | |
onNext(args); | |
} | |
@Override | |
public void onError(Throwable e) { | |
u.onError(e); | |
} | |
@Override | |
public void onCompleted() { | |
} | |
}); | |
} | |
public Subscription subscribe(Action1<T> onNext) { | |
return subscribe(new Observer<T>() { | |
@Override | |
public void onNext(T value) { | |
onNext.call(value); | |
} | |
@Override | |
public void onError(Throwable t) { | |
t.printStackTrace(); | |
} | |
@Override | |
public void onCompleted() { | |
System.out.println("Done"); | |
} | |
}); | |
} | |
public static void main(String[] args) throws Exception { | |
from(Arrays.asList(1)).repeat().take(10_000_000).subscribe((a) -> { }); | |
from(Arrays.asList(1)).subscribeOn(Schedulers.computation()).repeat().take(10_000_000).subscribe((a) -> { }); | |
Thread.sleep(100000); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment