Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package RxTest.RxTest;
import rx.functions.Func2;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import rx.Observable;
public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
private final Func2<T, T, R> mergeFunc;
public LookbackTransformer(Func2<T, T, R> mergeFunc) {
this.mergeFunc = mergeFunc;
}
@Override
public Observable<R> call(Observable<T> source) {
source = source.share();
Observable<T> offsetSource = source.skip(1);
return source.zipWith(offsetSource, mergeFunc);
}
public static void main(String[] args) throws IOException {
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(10)
.doOnNext(e -> System.out.println("do " + e))
.compose(new LookbackTransformer<Long, String>((a, b) -> a + " " + b))
.subscribe(e -> System.out.println("sb " + e));
System.in.read();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.