Skip to content

Instantly share code, notes, and snippets.

@mttkay
Created November 4, 2015 15:46
Show Gist options
  • Save mttkay/24881a0ce986f6ec4b4d to your computer and use it in GitHub Desktop.
Save mttkay/24881a0ce986f6ec4b4d to your computer and use it in GitHub Desktop.
A simple Rx based pager
public class Pager<I, O> {
private static final Observable FINISH_SEQUENCE = Observable.never();
private PublishSubject<Observable<I>> pages;
private Observable<I> nextPage = finish();
private Subscription subscription = Subscriptions.empty();
private final PagingFunction<I> pagingFunction;
private final Func1<I, O> pageTransformer;
public static <T> Pager<T, T> create(PagingFunction<T> pagingFunction) {
return new Pager<>(pagingFunction, UtilityFunctions.<T>identity());
}
public static <I, O> Pager<I, O> create(PagingFunction<I> pagingFunction, Func1<I, O> pageTransformer) {
return new Pager<>(pagingFunction, pageTransformer);
}
Pager(PagingFunction<I> pagingFunction, Func1<I, O> pageTransformer) {
this.pagingFunction = pagingFunction;
this.pageTransformer = pageTransformer;
}
/**
* Used in the paging function to signal the caller that no more pages are available, i.e.
* to finish paging by completing the paged sequence.
*
* @return the finish token
*/
@SuppressWarnings("unchecked")
public static <T> Observable<T> finish() {
return FINISH_SEQUENCE;
}
/**
* Transforms the given sequence to have its subsequent pages pushed into the observer subscribed
* to the new sequence returned by this method. You can advance to the next page by calling {@link #next()}
*
* @param source the source sequence, which would be the first page of the sequence to be paged
* @return a new sequence based on {@code source}, where subscribers keep receiving pages through subsequent calls
* to {@link #next()}
*/
public Observable<O> page(final Observable<I> source) {
return Observable.create(new Observable.OnSubscribe<O>() {
@Override
public void call(final Subscriber<? super O> subscriber) {
pages = PublishSubject.create();
subscription = Observable.switchOnNext(pages).subscribe(new PageSubscriber(subscriber));
subscriber.add(subscription);
pages.onNext(source);
}
});
}
/**
* Returns the last page received from the pager. You may use this to
* retry that observable in case it failed the first time around.
*/
public Observable<O> currentPage() {
return page(nextPage);
}
/**
* @return true, if there are more pages to be emitted.
*/
public boolean hasNext() {
return nextPage != FINISH_SEQUENCE;
}
/**
* Advances the pager by pushing the next page of items into the current observer, is there is one. If the pager
* has been unsubscribed from or there are no more pages, this method does nothing.
*/
public void next() {
if (!subscription.isUnsubscribed() && hasNext()) {
pages.onNext(nextPage);
}
}
public interface PagingFunction<T> extends Func1<T, Observable<T>> {
}
private final class PageSubscriber extends Subscriber<I> {
private final Subscriber<? super O> inner;
public PageSubscriber(Subscriber<? super O> inner) {
this.inner = inner;
}
@Override
public void onCompleted() {
inner.onCompleted();
}
@Override
public void onError(Throwable e) {
inner.onError(e);
}
@Override
public void onNext(I result) {
nextPage = pagingFunction.call(result);
inner.onNext(pageTransformer.call(result));
if (nextPage == FINISH_SEQUENCE) {
pages.onCompleted();
}
}
}
}
@mttkay
Copy link
Author

mttkay commented Nov 5, 2015

Probably! I'm sure there are a many ways of arriving at the same effect. This implementation is quite old, I'm not convinced RxJava even had backpressure support when I first wrote it. It works well, so we never went back and changed it to something that might be more idiomatic with today's available APIs.

@NiteshKant
Copy link

@roman-mazur @mttkay the gist I provided on twitter does use backpressure as the paging control.

@cristiangarciascmspain
Copy link

@mttkay I did a Pager based on yours:

public class Pager<I, P> {
  private PublishSubject<P> pages;
  private P nextPageToken;

  private final Func2<P, I, P> pagingFunction;
  private final Func1<P, Observable<I>> obtainFunction;

  public static <T, O> Pager<T, O> create(O firstPage, Func2<O, T, O> pagingFunction,
      Func1<O, Observable<T>> obtainFunction) {
    return new Pager<>(firstPage, pagingFunction, obtainFunction);
  }

  Pager(P firstPage, Func2<P, I, P> pagingFunction, Func1<P, Observable<I>> obtainFunction) {
    this.nextPageToken = firstPage;
    this.pagingFunction = pagingFunction;
    this.obtainFunction = obtainFunction;

    pages = PublishSubject.create();

    pages.doOnNext(p -> {
      if (p == null) {
        pages.onCompleted();
      }
    });
  }

  public Observable<I> getObservable() {
    return Observable.defer(() -> page(nextPageToken));
  }

  public boolean hasNext() {
    return nextPageToken != null;
  }

  public void next() {
    if (pages.hasObservers() && hasNext()) {
      pages.onNext(nextPageToken);
    }
  }

  private Observable<I> page(final P source) {
    return pages.startWith(source)
        .flatMap(obtainFunction)
        .doOnNext(page -> nextPageToken = pagingFunction.call(nextPageToken, page));
  }
}

The usage is:
Pager.create(initialPageToken, (oldPageToken, pageResult) -> pageResult.getNextPageToken(), datasource::getPage )
or if you want to use offset instead of token:
Pager.create(0, (offset, pageResult) -> offset + pageResult.size(), datasource::getPageWithOffset )
basically the first 2 parameter are like rxjava scan

I don't know if I may be introducing any problem, I just tried to simplify the code.

@grandstaish
Copy link

grandstaish commented Jun 22, 2016

@mttkay Are you missing setting nextPage = source inside of the public Observable<O> page(final Observable<I> source) method? If the first request fails, then nextPage Observable is never updated and is still equal to FINISH_SEQUENCE. This means you can't retry the first request using the currentPage() method as per your comment :(. Otherwise this solution is great, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment