Skip to content

Instantly share code, notes, and snippets.

@sys1yagi
Last active August 29, 2015 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sys1yagi/cd5fadded4e0fd081054 to your computer and use it in GitHub Desktop.
Save sys1yagi/cd5fadded4e0fd081054 to your computer and use it in GitHub Desktop.
//usecase
api.user() //Observable<User>
.lift(new SerialOperator<>(
new Action2<Subscriber<? super List<Feed>>, User>() {
@Override
public void call(Subscriber<? super List<Feed>> subscriber, User user) {
api.userFeed(user).subscribe(subscriber);
}
}))
.lift(new SerialOperator<>(
new Action2<Subscriber<? super String>, List<Feed>>() {
@Override
public void call(final Subscriber<? super String> subscriber, List<Feed> feeds) {
Observable.from(feeds)
.reduce(null, new Func2<String, Feed, String>() {
@Override
public String call(String s, Feed feed) {
return (s == null ? "" : s + ",") + feed.summarize();
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
subscriber.onNext(s);
subscriber.onCompleted();
}
});
}
}))
.subscribe(new Action1<String>() {
@Override
public void call(String summary) {
Log.d("moge", "summary=" + summary);
}
});
public class SerialOperator<R, T> implements Observable.Operator<R, T> {
Action2<Subscriber<? super R>, T> then;
T next;
SerialOperator(Action2<Subscriber<? super R>, T> then) {
this.then = then;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> subscriber) {
return new Subscriber<T>() {
@Override
public void onCompleted() {
then.call(subscriber, next);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onNext(T t) {
next = t;
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment