package RxTest.RxTest; | |
import rx.functions.Func2; | |
import java.io.IOException; | |
import java.util.concurrent.TimeUnit; | |
import rx.Observable; | |
public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> { | |
private final Func2<T, T, R> mergeFunc; | |
public LookbackTransformer(Func2<T, T, R> mergeFunc) { | |
this.mergeFunc = mergeFunc; | |
} | |
@Override | |
public Observable<R> call(Observable<T> source) { | |
source = source.share(); | |
Observable<T> offsetSource = source.skip(1); | |
return source.zipWith(offsetSource, mergeFunc); | |
} | |
public static void main(String[] args) throws IOException { | |
Observable.interval(100, TimeUnit.MILLISECONDS) | |
.take(10) | |
.doOnNext(e -> System.out.println("do " + e)) | |
.compose(new LookbackTransformer<Long, String>((a, b) -> a + " " + b)) | |
.subscribe(e -> System.out.println("sb " + e)); | |
System.in.read(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment