Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
A simple Rx based pager
public class Pager<I, O> {
private static final Observable FINISH_SEQUENCE = Observable.never();
private PublishSubject<Observable<I>> pages;
private Observable<I> nextPage = finish();
private Subscription subscription = Subscriptions.empty();
private final PagingFunction<I> pagingFunction;
private final Func1<I, O> pageTransformer;
public static <T> Pager<T, T> create(PagingFunction<T> pagingFunction) {
return new Pager<>(pagingFunction, UtilityFunctions.<T>identity());
}
public static <I, O> Pager<I, O> create(PagingFunction<I> pagingFunction, Func1<I, O> pageTransformer) {
return new Pager<>(pagingFunction, pageTransformer);
}
Pager(PagingFunction<I> pagingFunction, Func1<I, O> pageTransformer) {
this.pagingFunction = pagingFunction;
this.pageTransformer = pageTransformer;
}
/**
* Used in the paging function to signal the caller that no more pages are available, i.e.
* to finish paging by completing the paged sequence.
*
* @return the finish token
*/
@SuppressWarnings("unchecked")
public static <T> Observable<T> finish() {
return FINISH_SEQUENCE;
}
/**
* Transforms the given sequence to have its subsequent pages pushed into the observer subscribed
* to the new sequence returned by this method. You can advance to the next page by calling {@link #next()}
*
* @param source the source sequence, which would be the first page of the sequence to be paged
* @return a new sequence based on {@code source}, where subscribers keep receiving pages through subsequent calls
* to {@link #next()}
*/
public Observable<O> page(final Observable<I> source) {
return Observable.create(new Observable.OnSubscribe<O>() {
@Override
public void call(final Subscriber<? super O> subscriber) {
pages = PublishSubject.create();
subscription = Observable.switchOnNext(pages).subscribe(new PageSubscriber(subscriber));
subscriber.add(subscription);
pages.onNext(source);
}
});
}
/**
* Returns the last page received from the pager. You may use this to
* retry that observable in case it failed the first time around.
*/
public Observable<O> currentPage() {
return page(nextPage);
}
/**
* @return true, if there are more pages to be emitted.
*/
public boolean hasNext() {
return nextPage != FINISH_SEQUENCE;
}
/**
* Advances the pager by pushing the next page of items into the current observer, is there is one. If the pager
* has been unsubscribed from or there are no more pages, this method does nothing.
*/
public void next() {
if (!subscription.isUnsubscribed() && hasNext()) {
pages.onNext(nextPage);
}
}
public interface PagingFunction<T> extends Func1<T, Observable<T>> {
}
private final class PageSubscriber extends Subscriber<I> {
private final Subscriber<? super O> inner;
public PageSubscriber(Subscriber<? super O> inner) {
this.inner = inner;
}
@Override
public void onCompleted() {
inner.onCompleted();
}
@Override
public void onError(Throwable e) {
inner.onError(e);
}
@Override
public void onNext(I result) {
nextPage = pagingFunction.call(result);
inner.onNext(pageTransformer.call(result));
if (nextPage == FINISH_SEQUENCE) {
pages.onCompleted();
}
}
}
}
@mttkay
Owner
mttkay commented Nov 4, 2015

There is the assumption encoded here that I is the page type, i.e. it needs to be a collection itself. That could be an explicit page type you define or simply some Iterable<T> of items. We specifically didn't want to put a type bound on I because it can really be anything, and the pager doesn't care either way.

@mttkay
Owner
mttkay commented Nov 4, 2015

The way you'd use it:

// assuming a page type of `List<Integer>`, create your initial sequence
Observable<List<Integer>> source = Observable.just(singletonList(1, 2, 3));

// create the pager instance and provide the paging function
Pager<List<Integer>, List<Integer>> pager = Pager.create(new PagingFunction<List<Integer>>() {
       public Observable<List<Integer>> call(List<Integer> previousPage) {
             // you need to define what it means to have "no more pages";
             // it could be the absence of a "next" link in a REST response, or no more
             // rows being read from a local database, or whatever you think you need.
             if (noMorePages()) {
                 return finish();
             } else {
                 // construct next page from previous page;
                 // in a production impl this could be constructing a request Observable
                 // by following a link in a REST API, but it typically involves looking at
                 // `previousPage`
                 return Observable.just(singletonList(4, 5, 6));
             }
       }
});

// page your sequence; this will emit (1, 2, 3) to the subscriber right away
pager.page(source).subscribe(subscriber);

// this will emit (4, 5, 6)
pager.next();

...
@mttkay
Owner
mttkay commented Nov 4, 2015

You can also provide a page transformer which the pager will apply when constructing the next page, in case your subscriber expects a different item type than that of the source sequence. This is nice if you want to hide low level data structures from a presentation level subscriber, such as when you're emitting API responses.

@roman-mazur

Isn't back pressure kinda relative to paging? Do you think paging could be implemented through back pressure?

@mttkay
Owner
mttkay commented Nov 5, 2015

Probably! I'm sure there are a many ways of arriving at the same effect. This implementation is quite old, I'm not convinced RxJava even had backpressure support when I first wrote it. It works well, so we never went back and changed it to something that might be more idiomatic with today's available APIs.

@NiteshKant

@roman-mazur @mttkay the gist I provided on twitter does use backpressure as the paging control.

@cristiangarciascmspain

@mttkay I did a Pager based on yours:

public class Pager<I, P> {
  private PublishSubject<P> pages;
  private P nextPageToken;

  private final Func2<P, I, P> pagingFunction;
  private final Func1<P, Observable<I>> obtainFunction;

  public static <T, O> Pager<T, O> create(O firstPage, Func2<O, T, O> pagingFunction,
      Func1<O, Observable<T>> obtainFunction) {
    return new Pager<>(firstPage, pagingFunction, obtainFunction);
  }

  Pager(P firstPage, Func2<P, I, P> pagingFunction, Func1<P, Observable<I>> obtainFunction) {
    this.nextPageToken = firstPage;
    this.pagingFunction = pagingFunction;
    this.obtainFunction = obtainFunction;

    pages = PublishSubject.create();

    pages.doOnNext(p -> {
      if (p == null) {
        pages.onCompleted();
      }
    });
  }

  public Observable<I> getObservable() {
    return Observable.defer(() -> page(nextPageToken));
  }

  public boolean hasNext() {
    return nextPageToken != null;
  }

  public void next() {
    if (pages.hasObservers() && hasNext()) {
      pages.onNext(nextPageToken);
    }
  }

  private Observable<I> page(final P source) {
    return pages.startWith(source)
        .flatMap(obtainFunction)
        .doOnNext(page -> nextPageToken = pagingFunction.call(nextPageToken, page));
  }
}

The usage is:
Pager.create(initialPageToken, (oldPageToken, pageResult) -> pageResult.getNextPageToken(), datasource::getPage )
or if you want to use offset instead of token:
Pager.create(0, (offset, pageResult) -> offset + pageResult.size(), datasource::getPageWithOffset )
basically the first 2 parameter are like rxjava scan

I don't know if I may be introducing any problem, I just tried to simplify the code.

@grandstaish
grandstaish commented Jun 22, 2016 edited

@mttkay Are you missing setting nextPage = source inside of the public Observable<O> page(final Observable<I> source) method? If the first request fails, then nextPage Observable is never updated and is still equal to FINISH_SEQUENCE. This means you can't retry the first request using the currentPage() method as per your comment :(. Otherwise this solution is great, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment