Created
May 14, 2014 18:24
-
-
Save mikea/b39fa67a2556a4a031a8 to your computer and use it in GitHub Desktop.
Attempt to implement headAndTail function. Results in double reading a file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class HeadTest { | |
public static void main(String[] args) { | |
Observable<Integer> fileReader = Observable.create((Observable.OnSubscribe<Integer>) subscriber -> { | |
System.out.println("Reading from a file"); | |
subscriber.onNext(100); // header | |
for (int i = 0; i < 10; ++i) { | |
subscriber.onNext(i); | |
} | |
subscriber.onCompleted(); | |
}); | |
Pair<Observable<Integer>, Observable<Integer>> headAndTail = headAndTail(fileReader); | |
Integer headerValue = BlockingObservable.from(headAndTail.x).single(); | |
Observable<Integer> processedValues = headAndTail.y.map(i -> i + headerValue); | |
BlockingObservable.from(processedValues).forEach(System.out::println); | |
} | |
public static <T> Pair<Observable<T>, Observable<T>> headAndTail(Observable<T> o) { | |
return new Pair<>(o.first(), o.skip(1)); | |
} | |
public static class Pair<X, Y> { | |
public final X x; | |
public final Y y; | |
public Pair(X x, Y y) { | |
this.x = x; | |
this.y = y; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Reading from a file | |
Reading from a file | |
100 | |
101 | |
102 | |
103 | |
104 | |
105 | |
106 | |
107 | |
108 | |
109 |
Here is another approach ... it's got deficiencies, but can also work, and is better for certain uses:
package test;
import rx.Observable;
import rx.Subscriber;
import rx.subjects.PublishSubject;
public class HeadImpl3 {
public static void main(String[] args) {
Observable<Long> is = Observable.create((Subscriber<? super Long> s) -> {
long l = 0;
while (!s.isUnsubscribed()) {
s.onNext(l++);
}
System.out.println("source unsubscribed");
});
HeadObservable.from(is).flatMap(ho -> {
System.out.println("Head: " + ho.head);
return ho;
}).take(5).toBlockingObservable().forEach(t -> { // non-blocking would be .subscribe here
System.out.println("Rest: " + t);
});
}
public static class HeadObservable<T> extends Observable<T> {
private final T head;
protected HeadObservable(final T head, final Observable<T> is) {
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> s) {
is.unsafeSubscribe(s);
}
});
this.head = head;
}
public static <T> Observable<HeadObservable<T>> from(Observable<T> is) {
return is.lift(new Operator<HeadObservable<T>, T>() {
@Override
public Subscriber<? super T> call(Subscriber<? super HeadObservable<T>> child) {
return new Subscriber<T>(child) {
private HeadObservable<T> ho = null;
private final PublishSubject<T> rest = PublishSubject.create();
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(T t) {
if (ho == null) {
ho = new HeadObservable<T>(t, rest);
child.onNext(ho);
} else {
rest.onNext(t);
}
}
};
}
});
}
}
}
Here is a more robust implementation that better handles unsubscribing. It does not work with synchronous sources though ... until a fix is done in RxJava which already has a PR open. See comments in code.
package test;
import rx.Observable;
import rx.Subscriber;
import rx.observables.ConnectableObservable;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
public class HeadImpl4 {
public static void main(String[] args) {
Observable<Long> is = Observable.create((Subscriber<? super Long> s) -> {
long l = 0;
while (!s.isUnsubscribed()) {
s.onNext(l++);
}
System.out.println("source unsubscribed");
}).subscribeOn(Schedulers.computation());
HeadObservable.from(is).flatMap(ho -> {
System.out.println("Head: " + ho.head);
return ho;
}).take(5).subscribe(t -> {
System.out.println("Rest: " + t);
});
System.out.println("-------------------------");
HeadObservable.from(is).flatMap(ho -> {
System.out.println("Head: " + ho.head);
return ho;
}).take(5).toBlockingObservable().forEach(t -> {
System.out.println("Rest: " + t);
});
System.out.println("-------------------------");
HeadObservable.from(is).flatMap(ho -> {
System.out.println("Head: " + ho.head);
return ho.take(5);
}).toBlockingObservable().forEach(t -> {
System.out.println("Rest: " + t);
});
}
public static class HeadObservable<T> extends Observable<T> {
private final T head;
protected HeadObservable(final T head, final Observable<T> is) {
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> s) {
is.unsafeSubscribe(s);
}
});
this.head = head;
}
public static <T> Observable<HeadObservable<T>> from(Observable<T> is) {
final ConnectableObservable<T> multicast = is.multicast(PublishSubject.create());
final Observable<T> head = multicast.take(1);
final Observable<T> rest = multicast;
return Observable.create(new OnSubscribe<HeadObservable<T>>() {
@Override
public void call(Subscriber<? super HeadObservable<T>> s) {
head.unsafeSubscribe(new Subscriber<T>(s) {
@Override
public void onCompleted() {
s.onCompleted();
}
@Override
public void onError(Throwable e) {
s.onError(e);
}
@Override
public void onNext(T t) {
s.onNext(new HeadObservable<T>(t, rest));
s.onCompleted();
}
});
// TODO need to update ConnectableObservable to support synchronous Observables (v0.17+)
// see https://github.com/Netflix/RxJava/pull/1175 for possible fix to this
s.add(multicast.connect());
}
});
}
}
}
This feels like the most elegant however, and works with each of the use cases I've tried:
package test;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
public class HeadImpl1b {
public static void main(String[] args) {
Observable<Long> is = Observable.create((Subscriber<? super Long> s) -> {
long l = 0;
while (!s.isUnsubscribed()) {
s.onNext(l++);
}
System.out.println("source unsubscribed");
});
HeadObservable.from(is).flatMap(ho -> {
System.out.println("Head: " + ho.head);
return ho;
}).take(5).subscribe(t -> {
System.out.println("Rest: " + t);
});
System.out.println("-------------------------");
HeadObservable.from(is).flatMap(ho -> {
System.out.println("Head: " + ho.head);
return ho;
}).take(5).toBlockingObservable().forEach(t -> {
System.out.println("Rest: " + t);
});
System.out.println("-------------------------");
HeadObservable.from(is).flatMap(ho -> {
System.out.println("Head: " + ho.head);
return ho.take(5);
}).toBlockingObservable().forEach(t -> {
System.out.println("Rest: " + t);
});
}
public static class HeadObservable<T> extends Observable<T> {
private final T head;
protected HeadObservable(final T head, final Observable<T> is) {
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> s) {
is.unsafeSubscribe(s);
}
});
this.head = head;
}
public static <T> Observable<HeadObservable<T>> from(Observable<T> is) {
return is.groupBy(new Func1<T, Boolean>() {
boolean firstEmitted = false;
@Override
public Boolean call(T l) {
if (firstEmitted) {
return Boolean.FALSE;
} else {
firstEmitted = true;
return Boolean.TRUE;
}
}
}).buffer(2).take(1).flatMap(gs -> {
return gs.get(0).take(1).map(t -> {
return new HeadObservable<T>(t, gs.get(1));
});
});
}
}
}
Here are examples for accessing head and items at the same time:
// you have access to head and all the items here
// and can do side-effects in here
HeadObservable.from(is).flatMap(ho -> {
return ho.take(5).doOnNext(t -> {
System.out.println("Head: " + ho.head + " Item: " + t);
});
}).subscribe();
System.out.println("-------------------------");
// or can transform the data however needed
HeadObservable.from(is).flatMap(ho -> {
return ho.take(5).<List<Long>> map(t -> {
return Arrays.asList(ho.head, t);
});
}).subscribe(d -> {
System.out.println("Head: " + d.get(0) + " Item: " + d.get(1));
});
System.out.println("-------------------------");
// or reduce to a single output
HeadObservable.from(is).flatMap(ho -> {
return ho.take(5).reduce(0L, (acc, t) -> {
return acc + t;
}).<List<Long>> map(a -> Arrays.asList(ho.head, a));
}).subscribe(d -> {
System.out.println("Head: " + d.get(0) + " Accumulated Sum: " + d.get(1));
});
These emit:
-------------------------
Head: 0 Item: 1
Head: 0 Item: 2
Head: 0 Item: 3
Head: 0 Item: 4
Head: 0 Item: 5
source unsubscribed
-------------------------
Head: 0 Item: 1
Head: 0 Item: 2
Head: 0 Item: 3
Head: 0 Item: 4
Head: 0 Item: 5
source unsubscribed
-------------------------
Head: 0 Accumulated Sum: 15
source unsubscribed
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
After thinking about this, using
PublishSubject
is not the correct approach. Here is an implementation that may work for your use case: