Skip to content

Instantly share code, notes, and snippets.

@danielt1263
Last active April 18, 2022 19:41
Show Gist options
  • Star 38 You must be signed in to star a gist
  • Fork 8 You must be signed in to fork a gist
  • Save danielt1263/10bc5eb821c752ad45f281c6f4e3034b to your computer and use it in GitHub Desktop.
Save danielt1263/10bc5eb821c752ad45f281c6f4e3034b to your computer and use it in GitHub Desktop.
//
// PaginationNetworkLogic.swift
//
// Created by Daniel Tartaglia on 4/9/17.
// Copyright © 2019 Daniel Tartaglia. MIT License
//
import RxSwift
struct PaginationUISource {
/// reloads first page and dumps all other cached pages.
let refresh: Observable<Void>
/// loads next page
let loadNextPage: Observable<Void>
}
struct PaginationSink<T> {
/// true if network loading is in progress.
let isLoading: Observable<Bool>
/// elements from all loaded pages
let elements: Observable<[T]>
/// fires once for each error
let error: Observable<Error>
}
extension PaginationSink {
init(ui: PaginationUISource, loadData: @escaping (Int) -> Observable<[T]>)
{
let loadResults = BehaviorSubject<[Int: [T]]>(value: [:])
let maxPage = loadResults
.map { $0.keys }
.map { $0.max() ?? 1 }
let reload = ui.refresh
.map { -1 }
let loadNext = ui.loadNextPage
.withLatestFrom(maxPage)
.map { $0 + 1 }
let start = Observable.merge(reload, loadNext, Observable.just(1))
let page = start
.flatMap { page in
Observable.combineLatest(Observable.just(page), loadData(page == -1 ? 1 : page)) { (pageNumber: $0, items: $1) }
.materialize()
.filter { $0.isCompleted == false }
}
.share()
_ = page
.compactMap { $0.element }
.withLatestFrom(loadResults) { (pages: $1, newPage: $0) }
.filter { $0.newPage.pageNumber == -1 || !$0.newPage.items.isEmpty }
.map { $0.newPage.pageNumber == -1 ? [1: $0.newPage.items] : $0.pages.merging([$0.newPage], uniquingKeysWith: { $1 }) }
.subscribe(loadResults)
let _isLoading = Observable.merge(start.map { _ in 1 }, page.map { _ in -1 })
.scan(0, accumulator: +)
.map { $0 > 0 }
.distinctUntilChanged()
let _elements = loadResults
.map { $0.sorted(by: { $0.key < $1.key }).flatMap { $0.value } }
let _error = page
.map { $0.error }
.filter { $0 != nil }
.map { $0! }
isLoading = _isLoading
elements = _elements
error = _error
}
}
class PaginationTests: XCTestCase {
var testScheduler: TestScheduler!
var bag: DisposeBag!
var isLoading: TestableObserver<Bool>!
var elements: TestableObserver<[Int]>!
var error: TestableObserver<Error>!
var dataLoader: DataLoader!
override func setUp() {
super.setUp()
testScheduler = TestScheduler(initialClock: 0)
bag = DisposeBag()
isLoading = testScheduler.createObserver(Bool.self)
elements = testScheduler.createObserver([Int].self)
error = testScheduler.createObserver(Error.self)
dataLoader = DataLoader(testScheduler: testScheduler)
}
func testDefault() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)])
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
let foundErrors = testScheduler.createObserver(Bool.self)
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.map { _ in true }.subscribe(foundErrors)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .completed(30)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .completed(30)])
XCTAssertEqual(foundErrors.events, [.completed(30)])
XCTAssertEqual(dataLoader.pages, [1])
}
func testRefresh() {
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())])
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 1])
}
func testRefreshNewLoadEmpty() {
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())])
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadPageThenEmpty(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 1])
}
func testNextPage() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 2])
}
func testNextPageThenRefresh() {
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())])
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6]), .next(50, [1, 2, 3])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 2, 1])
}
func testNextPageBeforeInitialPage() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let loadNextPageTrigger = testScheduler.createColdObservable([.next(5, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(15, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(15, [1, 2, 3, 4, 5, 6])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 2])
}
func testNextPageComesBeforeInitialPage() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let loadNextPageTrigger = testScheduler.createColdObservable([.next(3, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.reverseLoadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(8, [4, 5, 6]), .next(10, [1, 2, 3, 4, 5, 6])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 2])
}
func testLoadError() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.errorLoadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]())])
XCTAssertEqual(error.events.count, 1)
XCTAssertEqual(dataLoader.pages, [1])
}
func testErrorThenRetry() {
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())])
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.errorThenSuccessLoadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(50, [1, 2, 3])])
XCTAssertEqual(error.events.map { $0.time }, [30])
XCTAssertEqual(dataLoader.pages, [1, 2, 1])
}
}
class DataLoader {
private (set) var pages: [Int] = []
private let testScheduler: TestScheduler
init(testScheduler: TestScheduler) {
self.testScheduler = testScheduler
}
func loadEmpty(page: Int) -> Observable<[Int]> {
pages.append(page)
return self.testScheduler.createColdObservable([.next(10, []), .completed(10)])
.asObservable()
}
func loadData(page: Int) -> Observable<[Int]> {
pages.append(page)
let result = Array((0..<3).map { page * 3 - (2 - $0) })
return self.testScheduler.createColdObservable([.next(10, result), .completed(10)])
.asObservable()
}
func reverseLoadData(page: Int) -> Observable<[Int]> {
pages.append(page)
let result = Array((0..<3).map { page * 3 - (2 - $0) })
let time = 15 - page * 5
return self.testScheduler.createColdObservable([.next(time, result), .completed(time)])
.asObservable()
}
func errorLoadData(page: Int) -> Observable<[Int]> {
pages.append(page)
let error = NSError(domain: "testing", code: -1, userInfo: nil)
return self.testScheduler.createColdObservable([Recorded<Event<[Int]>>.error(10, error)])
.asObservable()
}
func loadPageThenEmpty(page: Int) -> Observable<[Int]> {
if pages.count == 0 {
return loadData(page: page)
}
else {
return loadEmpty(page: page)
}
}
func errorThenSuccessLoadData(page: Int) -> Observable<[Int]> {
if pages.count == 1 {
return errorLoadData(page: page)
}
else {
return loadData(page: page)
}
}
}
@Shadester
Copy link

Would this work for a pagination with cursors?

@markst
Copy link

markst commented Aug 23, 2020

@danielt1263 thanks again for a great sink! any chance of this being made into a pod?

I'm having trouble where by the pagination.isLoading stops emitting events.
I've simplified the reproducible state code as example:

let pagination = PaginationSink<Show>(ui: source, loadData: { [unowned showsFilter] page in
  if let filter = showsFilter.value, filter.isEmpty == false {
    return Single<[Show]>
      .create(subscribe: { (single) -> Disposable in
        DispatchQueue.global(qos: .userInitiated).async {
          single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
        }
        return Disposables.create()
      })
      .asObservable()
  } else {
    return Single<[Show]>
      .create(subscribe: { (single) -> Disposable in
        single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
        return Disposables.create()
      })
      .asObservable()
  }
})

After returning filter Single, no 'loading' events are ever emitted again regardless of filter value.
What resolves the above chunk of code is starting with empty array .asObservable().startWith([])


I've tried to create a test to reflect my issue, but not so familiar with RxTest framework to produce appropriate events.
Best I can do for now:

func testLoadingEventsError() {
  
  let showsFilter = BehaviorRelay<String?>(value: nil)
  let refreshRelay = PublishRelay<Void>.init()

  let refreshTrigger = Observable.merge(showsFilter.skip(1).map({ _ in }), refreshRelay.asObservable())
  let loadNextPageTrigger = Observable<Void>.empty()
  let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())

  let expectationOne = XCTestExpectation(description: "Fetch filter")
  let expectationTwo = XCTestExpectation(description: "Fetch reload")
  
  let sink = PaginationSink(ui: source) { [unowned showsFilter] (page) -> Observable<[Int]> in
    if let filter = showsFilter.value, filter.isEmpty == false {
      return Single<[Int]>
        .create(subscribe: { (single) -> Disposable in
          DispatchQueue.global(qos: .userInitiated)
            .asyncAfter(deadline: .now() + 5, execute: {
              expectationOne.fulfill()
              single(.success(filter.map({ Int(String($0))! }) ))
            })
          return Disposables.create()
        })
        .asObservable()
    } else {
      return Single<[Int]>
        .create(subscribe: { (single) -> Disposable in
          DispatchQueue.global(qos: .userInitiated)
            .asyncAfter(deadline: .now() + 5, execute: {
              expectationTwo.fulfill()
              single(.success([]))
            })
          return Disposables.create()
        })
        .asObservable()
    }
  }
  
  bag.insert(
    sink.isLoading.subscribe(isLoading),
    sink.elements.subscribe(elements),
    sink.error.subscribe(error)
  )
  
  showsFilter.accept("123")
  refreshRelay.accept(())

  wait(for: [expectationOne, expectationTwo], timeout: 15.0)
  XCTAssertEqual(isLoading.events, [.next(0, true), .next(0, false), .next(0, true), .next(0, false)])
  XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(0, [1, 2, 3]), .next(0, [1, 2, 3])])
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment