Skip to content

Instantly share code, notes, and snippets.

@danielt1263
Last active April 18, 2022 19:41
Show Gist options
  • Star 38 You must be signed in to star a gist
  • Fork 8 You must be signed in to fork a gist
  • Save danielt1263/10bc5eb821c752ad45f281c6f4e3034b to your computer and use it in GitHub Desktop.
Save danielt1263/10bc5eb821c752ad45f281c6f4e3034b to your computer and use it in GitHub Desktop.
//
// PaginationNetworkLogic.swift
//
// Created by Daniel Tartaglia on 4/9/17.
// Copyright © 2019 Daniel Tartaglia. MIT License
//
import RxSwift
struct PaginationUISource {
/// reloads first page and dumps all other cached pages.
let refresh: Observable<Void>
/// loads next page
let loadNextPage: Observable<Void>
}
struct PaginationSink<T> {
/// true if network loading is in progress.
let isLoading: Observable<Bool>
/// elements from all loaded pages
let elements: Observable<[T]>
/// fires once for each error
let error: Observable<Error>
}
extension PaginationSink {
init(ui: PaginationUISource, loadData: @escaping (Int) -> Observable<[T]>)
{
let loadResults = BehaviorSubject<[Int: [T]]>(value: [:])
let maxPage = loadResults
.map { $0.keys }
.map { $0.max() ?? 1 }
let reload = ui.refresh
.map { -1 }
let loadNext = ui.loadNextPage
.withLatestFrom(maxPage)
.map { $0 + 1 }
let start = Observable.merge(reload, loadNext, Observable.just(1))
let page = start
.flatMap { page in
Observable.combineLatest(Observable.just(page), loadData(page == -1 ? 1 : page)) { (pageNumber: $0, items: $1) }
.materialize()
.filter { $0.isCompleted == false }
}
.share()
_ = page
.compactMap { $0.element }
.withLatestFrom(loadResults) { (pages: $1, newPage: $0) }
.filter { $0.newPage.pageNumber == -1 || !$0.newPage.items.isEmpty }
.map { $0.newPage.pageNumber == -1 ? [1: $0.newPage.items] : $0.pages.merging([$0.newPage], uniquingKeysWith: { $1 }) }
.subscribe(loadResults)
let _isLoading = Observable.merge(start.map { _ in 1 }, page.map { _ in -1 })
.scan(0, accumulator: +)
.map { $0 > 0 }
.distinctUntilChanged()
let _elements = loadResults
.map { $0.sorted(by: { $0.key < $1.key }).flatMap { $0.value } }
let _error = page
.map { $0.error }
.filter { $0 != nil }
.map { $0! }
isLoading = _isLoading
elements = _elements
error = _error
}
}
class PaginationTests: XCTestCase {
var testScheduler: TestScheduler!
var bag: DisposeBag!
var isLoading: TestableObserver<Bool>!
var elements: TestableObserver<[Int]>!
var error: TestableObserver<Error>!
var dataLoader: DataLoader!
override func setUp() {
super.setUp()
testScheduler = TestScheduler(initialClock: 0)
bag = DisposeBag()
isLoading = testScheduler.createObserver(Bool.self)
elements = testScheduler.createObserver([Int].self)
error = testScheduler.createObserver(Error.self)
dataLoader = DataLoader(testScheduler: testScheduler)
}
func testDefault() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)])
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
let foundErrors = testScheduler.createObserver(Bool.self)
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.map { _ in true }.subscribe(foundErrors)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .completed(30)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .completed(30)])
XCTAssertEqual(foundErrors.events, [.completed(30)])
XCTAssertEqual(dataLoader.pages, [1])
}
func testRefresh() {
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())])
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 1])
}
func testRefreshNewLoadEmpty() {
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())])
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadPageThenEmpty(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 1])
}
func testNextPage() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 2])
}
func testNextPageThenRefresh() {
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())])
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6]), .next(50, [1, 2, 3])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 2, 1])
}
func testNextPageBeforeInitialPage() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let loadNextPageTrigger = testScheduler.createColdObservable([.next(5, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(15, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(15, [1, 2, 3, 4, 5, 6])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 2])
}
func testNextPageComesBeforeInitialPage() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let loadNextPageTrigger = testScheduler.createColdObservable([.next(3, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.reverseLoadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(8, [4, 5, 6]), .next(10, [1, 2, 3, 4, 5, 6])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 2])
}
func testLoadError() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.errorLoadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]())])
XCTAssertEqual(error.events.count, 1)
XCTAssertEqual(dataLoader.pages, [1])
}
func testErrorThenRetry() {
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())])
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.errorThenSuccessLoadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(50, [1, 2, 3])])
XCTAssertEqual(error.events.map { $0.time }, [30])
XCTAssertEqual(dataLoader.pages, [1, 2, 1])
}
}
class DataLoader {
private (set) var pages: [Int] = []
private let testScheduler: TestScheduler
init(testScheduler: TestScheduler) {
self.testScheduler = testScheduler
}
func loadEmpty(page: Int) -> Observable<[Int]> {
pages.append(page)
return self.testScheduler.createColdObservable([.next(10, []), .completed(10)])
.asObservable()
}
func loadData(page: Int) -> Observable<[Int]> {
pages.append(page)
let result = Array((0..<3).map { page * 3 - (2 - $0) })
return self.testScheduler.createColdObservable([.next(10, result), .completed(10)])
.asObservable()
}
func reverseLoadData(page: Int) -> Observable<[Int]> {
pages.append(page)
let result = Array((0..<3).map { page * 3 - (2 - $0) })
let time = 15 - page * 5
return self.testScheduler.createColdObservable([.next(time, result), .completed(time)])
.asObservable()
}
func errorLoadData(page: Int) -> Observable<[Int]> {
pages.append(page)
let error = NSError(domain: "testing", code: -1, userInfo: nil)
return self.testScheduler.createColdObservable([Recorded<Event<[Int]>>.error(10, error)])
.asObservable()
}
func loadPageThenEmpty(page: Int) -> Observable<[Int]> {
if pages.count == 0 {
return loadData(page: page)
}
else {
return loadEmpty(page: page)
}
}
func errorThenSuccessLoadData(page: Int) -> Observable<[Int]> {
if pages.count == 1 {
return errorLoadData(page: page)
}
else {
return loadData(page: page)
}
}
}
@mesheilah
Copy link

mesheilah commented Dec 15, 2019

Thanks Daniel, I got a tiny improvement: replace lines 70-71 with compactMap{ $0 }.

at line 58, you have to dispose the subscription, why did you just assign it to an underscore ?

Anyway, If you want to make your pagination sink perfect, there is a scenario in which the isLoading observable emits wrong value, that happens when the loadData function returns immediate result without having to wait a period of time, say when retrieving data from a cached URLSession the result returns immediately as opposed to a network call, isLoading at first call emits true and doesn't emit a value for subsequent calls. Adding something like: .delay(.milliseconds(1), scheduler: MainScheduler.instance) for the URLSession's observable will make isLoading works as expected. I don't have a clue on how to change lines 60-63 to handle this scenario, but it's not a big issue as it's a tiny bug that won't happen in most scenarios.

@mesheilah
Copy link

what's the best way to return the current page number to the subscriber ?

@danielt1263
Copy link
Author

@mesheilah I think if I actually wanted individual pages, I would have the let elements: Observable<[T]> be a let elements: Observable<[[T]]> instead and return each page as a separate sub-array. The number of pages would be elements.map { $0.count }.

@mesheilah
Copy link

I meant by current page is the page that is currently loading, I thought it would be better to use isLoading as Observable<(Bool, Int)> to hold the current page number being loaded. but can't figure out where to glue the page number with isLoading.

@mesheilah
Copy link

The following is a Combine version I've written for this gist today:

import Foundation
import Combine

struct PaginationUISource {
    /// reloads first page and dumps all other cached pages.
    let refresh: AnyPublisher<Void, Error>
    /// loads next page
    let loadNextPage: AnyPublisher<Void, Error>
}

struct PaginationSink<T> {
    fileprivate var subscriptions = Set<AnyCancellable>()
    /// true if network loading is in progress.
    let isLoading: AnyPublisher<Bool, Error>
    /// elements from all loaded pages
    let elements: AnyPublisher<[T], Error>
}

extension PaginationSink {
    init(ui: PaginationUISource, loadData: @escaping (Int) -> AnyPublisher<[T], Error>) {
        let loadResults = CurrentValueSubject<[Int: [T]], Error>([:])
        
        let maxPage = loadResults
            .map { $0.keys }
            .map{ $0.max() ?? 1 }
        
        let reload = ui.refresh
            .map{ -1 }
                
        let loadNext = ui.loadNextPage
        .withLatestFrom(maxPage)
            .map { $0 + 1 }
        
        
        let start = Publishers.Merge(reload, loadNext)
        
        let page = start
            .flatMap { page in
                Just(page)
                    .setFailureType(to: Error.self)
                    .combineLatest(loadData(page == -1 ? 1 : page)) { (pageNumber: $0, items: $1) }
        }
        .share()
                
         page
        .filter { !$0.items.isEmpty }
            .withLatestFrom(loadResults) { (pages: $1, newPage: $0) }
            .map { $0.newPage.pageNumber == -1 ? [1: $0.newPage.items] : $0.pages.merging([$0.newPage], uniquingKeysWith: { $1 }) }
            .print("LoadResults") // <--- at this point loadResults can't get filled with pages
            .subscribe(loadResults)
            .store(in: &subscriptions)
        
        let _isLoading = Publishers.Merge(start.map { _ in 1 }, page.map { _ in -1 })
        .scan(0, +)
        .map { $0 > 0 }
        .removeDuplicates()
        
        
        let _elements = loadResults
            .map { $0.sorted(by: { $0.key < $1.key }).flatMap { $0.value } }
        
        isLoading = _isLoading.eraseToAnyPublisher()
        elements = _elements.eraseToAnyPublisher()
    }
}

I have used Shai's gist to use withLatestFrom custom operator with Combine which's supposed to be working fine after testing it with multiple scenarios. However, the issue I'm facing is that loadResults subject doesn't get filled with new pages, I have placed a print("LoadResults") operator as appears in my code above and the console prints the following output:

LoadResults: receive subscription: ((extension in TestSwiftUI):Combine.Publishers.WithLatestFrom<Combine.Publishers.Filter<Comb ... blah blah blah
LoadResults: request unlimited

as you see above, there is no LoadResults: receive value entry in the log, which means that loadResults subject is not filled with pages.
what's causing that weird issue ?

@mesheilah
Copy link

mesheilah commented Jan 29, 2020

Regarding Combine version above, it has no issues at all, the issue I was facing was because the refreshSubject didn't call send in the consumer class (or the view model) to trigger fetching the first page.

@alielsokary
Copy link

alielsokary commented Mar 16, 2020

Hi Daniel,
First, I want to personally thank you for this approach.
I have a question.
I'm trying to use the paginationSink like so:

let source = PaginationUISource(refresh: _refreshTrigger, loadNextPage: _nextPageTrigger)
let sink = PaginationSink(ui: source, loadData: service.getNextData(page:))

Where getNextData is
func getNextData(page: Int) -> Observable<[TheData]>

This works fine but I want to add a parameter to getNextData like so getNextData(category: String, page: Int) -> Observable<[TheData]>
and use it like so:
let sink = PaginationSink(ui: source, loadData: service.getNextData(section: "home", page:1))

I get Cannot convert value of type 'Observable<[TheData]>' to expected argument type '(Int) -> Observable<[_]>'

I understand the error is because of the PaginationSink init loadData: expects a closure of (Int) -> Observable<[T]>
I know that the solution for this maybe something silly but I'm not sure what is next step here.
Thank you

@danielt1263
Copy link
Author

danielt1263 commented Mar 16, 2020

@alielsokary Try something like:

func getNextData(service: Service, category: String) -> (_ page: Int) -> Observable<[TheData]> {
    return { page in 
        service.getNextData(category: category, page: page)
    }
}

The function above will return a function you can pass to the PaginationSink.
let sink = PaginationSink(ui: source, loadData: getNextData(service: service, category: "home"))

@alielsokary
Copy link

Hi Daniel,
Thank you for your quick response. are you sure service.getNextData(section: section, page: page) is correct?. because page is not a parameter in getNextData and when I tried this in my code I got an error Extra argument 'page' in the call. also the getNextData(section: section, page: page)expects argumentservice`

@danielt1263
Copy link
Author

@alielsokary I don't know. That's your function. I see that I used section where your function has category so I changed that in the above. The idea here is to make a factory function that takes the service and category as parameters and returns a function that takes the page number as a parameter, then you can use all three parameters to call your function.

@alielsokary
Copy link

It worked!
Thank you so much Daniel for your quick response and support :)

@Shadester
Copy link

Would this work for a pagination with cursors?

@markst
Copy link

markst commented Aug 23, 2020

@danielt1263 thanks again for a great sink! any chance of this being made into a pod?

I'm having trouble where by the pagination.isLoading stops emitting events.
I've simplified the reproducible state code as example:

let pagination = PaginationSink<Show>(ui: source, loadData: { [unowned showsFilter] page in
  if let filter = showsFilter.value, filter.isEmpty == false {
    return Single<[Show]>
      .create(subscribe: { (single) -> Disposable in
        DispatchQueue.global(qos: .userInitiated).async {
          single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
        }
        return Disposables.create()
      })
      .asObservable()
  } else {
    return Single<[Show]>
      .create(subscribe: { (single) -> Disposable in
        single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
        return Disposables.create()
      })
      .asObservable()
  }
})

After returning filter Single, no 'loading' events are ever emitted again regardless of filter value.
What resolves the above chunk of code is starting with empty array .asObservable().startWith([])


I've tried to create a test to reflect my issue, but not so familiar with RxTest framework to produce appropriate events.
Best I can do for now:

func testLoadingEventsError() {
  
  let showsFilter = BehaviorRelay<String?>(value: nil)
  let refreshRelay = PublishRelay<Void>.init()

  let refreshTrigger = Observable.merge(showsFilter.skip(1).map({ _ in }), refreshRelay.asObservable())
  let loadNextPageTrigger = Observable<Void>.empty()
  let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())

  let expectationOne = XCTestExpectation(description: "Fetch filter")
  let expectationTwo = XCTestExpectation(description: "Fetch reload")
  
  let sink = PaginationSink(ui: source) { [unowned showsFilter] (page) -> Observable<[Int]> in
    if let filter = showsFilter.value, filter.isEmpty == false {
      return Single<[Int]>
        .create(subscribe: { (single) -> Disposable in
          DispatchQueue.global(qos: .userInitiated)
            .asyncAfter(deadline: .now() + 5, execute: {
              expectationOne.fulfill()
              single(.success(filter.map({ Int(String($0))! }) ))
            })
          return Disposables.create()
        })
        .asObservable()
    } else {
      return Single<[Int]>
        .create(subscribe: { (single) -> Disposable in
          DispatchQueue.global(qos: .userInitiated)
            .asyncAfter(deadline: .now() + 5, execute: {
              expectationTwo.fulfill()
              single(.success([]))
            })
          return Disposables.create()
        })
        .asObservable()
    }
  }
  
  bag.insert(
    sink.isLoading.subscribe(isLoading),
    sink.elements.subscribe(elements),
    sink.error.subscribe(error)
  )
  
  showsFilter.accept("123")
  refreshRelay.accept(())

  wait(for: [expectationOne, expectationTwo], timeout: 15.0)
  XCTAssertEqual(isLoading.events, [.next(0, true), .next(0, false), .next(0, true), .next(0, false)])
  XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(0, [1, 2, 3]), .next(0, [1, 2, 3])])
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment