Skip to content

Instantly share code, notes, and snippets.

@knutwalker
Last active August 29, 2015 14:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save knutwalker/a19f2c780fdaedd57db7 to your computer and use it in GitHub Desktop.
Save knutwalker/a19f2c780fdaedd57db7 to your computer and use it in GitHub Desktop.
slidingWindow in RxJava
import fj.P;
import fj.P2;
import rx.Observable;
import rx.Subscriber;
public final class SlidingWindow {
public static Observable<P2<Integer, Integer>> viaScan(final Observable<Integer> integers) {
// allocate new Integer outside of integer cache range, so that it can be compared using `==`
// noinspection UnnecessaryBoxing
final Integer sentinel = new Integer(133742);
// noinspection NumberEquality
return integers
.scan(P.p(sentinel, sentinel), (previous, x) -> P.p(previous._2(), x))
.filter(tpl -> tpl._1() != sentinel);
}
public static Observable<P2<Integer, Integer>> viaOperator(final Observable<Integer> integers) {
return integers.<P2<Integer, Integer>>lift(SlidingSubscriber::new);
}
public static void main(final String... args) {
final Observable<Integer> integers = Observable.range(1337, 42);
viaScan(integers).toBlocking().forEach(System.out::println);
viaOperator(integers).toBlocking().forEach(System.out::println);
}
private static final class SlidingSubscriber extends Subscriber<Integer> {
private Integer previous;
private final Subscriber<?super P2<Integer, Integer>> downstream;
private SlidingSubscriber(final Subscriber<? super P2<Integer, Integer>> downstream) {
this.downstream = downstream;
}
@Override
public void onCompleted() {
downstream.onCompleted();
}
@Override
public void onError(Throwable e) {
downstream.onError(e);
}
@Override
public void onNext(Integer integer) {
if (previous != null) {
downstream.onNext(P.p(previous, integer));
}
previous = integer;
}
}
}
@MartinSeeler
Copy link

👍 Well done!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment