Skip to content

Instantly share code, notes, and snippets.

Created November 3, 2015 15:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/c3112db1f40dec5b529e to your computer and use it in GitHub Desktop.
Save anonymous/c3112db1f40dec5b529e to your computer and use it in GitHub Desktop.
RxJava - using a custom stateful Operator
// Maps an emitted item to an Integer designating the item's zero-based index
public class IndexOperator<T> implements Observable.Operator<Integer, T> {
private int counter;
public IndexOperator() {
counter = 0;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super Integer> subscriber) {
return new Subscriber<T>() {
@Override
public void onCompleted() {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
@Override
public void onNext(T t) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(counter);
counter++;
}
}
};
}
}
Observable<Foo> emitter = ...;
emitter.lift(new IndexOperator<Foo>).subscribe(new Subscriber<Integer>() {...});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment