Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
//
// PaginationNetworkLogic.swift
//
// Created by Daniel Tartaglia on 4/9/17.
// Copyright © 2019 Daniel Tartaglia. MIT License
//
import RxSwift
struct PaginationUISource {
/// reloads first page and dumps all other cached pages.
let refresh: Observable<Void>
/// loads next page
let loadNextPage: Observable<Void>
}
struct PaginationSink<T> {
/// true if network loading is in progress.
let isLoading: Observable<Bool>
/// elements from all loaded pages
let elements: Observable<[T]>
/// fires once for each error
let error: Observable<Error>
}
extension PaginationSink {
init(ui: PaginationUISource, loadData: @escaping (Int) -> Observable<[T]>)
{
let loadResults = BehaviorSubject<[Int: [T]]>(value: [:])
let maxPage = loadResults
.map { $0.keys }
.map { $0.max() ?? 1 }
let reload = ui.refresh
.map { -1 }
let loadNext = ui.loadNextPage
.withLatestFrom(maxPage)
.map { $0 + 1 }
let start = Observable.merge(reload, loadNext, Observable.just(1))
let page = start
.flatMap { page in
Observable.combineLatest(Observable.just(page), loadData(page == -1 ? 1 : page)) { (pageNumber: $0, items: $1) }
.materialize()
.filter { $0.isCompleted == false }
}
.share()
_ = page
.compactMap { $0.element }
.withLatestFrom(loadResults) { (pages: $1, newPage: $0) }
.filter { $0.newPage.pageNumber == -1 || !$0.newPage.items.isEmpty }
.map { $0.newPage.pageNumber == -1 ? [1: $0.newPage.items] : $0.pages.merging([$0.newPage], uniquingKeysWith: { $1 }) }
.subscribe(loadResults)
let _isLoading = Observable.merge(start.map { _ in 1 }, page.map { _ in -1 })
.scan(0, accumulator: +)
.map { $0 > 0 }
.distinctUntilChanged()
let _elements = loadResults
.map { $0.sorted(by: { $0.key < $1.key }).flatMap { $0.value } }
let _error = page
.map { $0.error }
.filter { $0 != nil }
.map { $0! }
isLoading = _isLoading
elements = _elements
error = _error
}
}
class PaginationTests: XCTestCase {
var testScheduler: TestScheduler!
var bag: DisposeBag!
var isLoading: TestableObserver<Bool>!
var elements: TestableObserver<[Int]>!
var error: TestableObserver<Error>!
var dataLoader: DataLoader!
override func setUp() {
super.setUp()
testScheduler = TestScheduler(initialClock: 0)
bag = DisposeBag()
isLoading = testScheduler.createObserver(Bool.self)
elements = testScheduler.createObserver([Int].self)
error = testScheduler.createObserver(Error.self)
dataLoader = DataLoader(testScheduler: testScheduler)
}
func testDefault() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)])
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
let foundErrors = testScheduler.createObserver(Bool.self)
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.map { _ in true }.subscribe(foundErrors)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .completed(30)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .completed(30)])
XCTAssertEqual(foundErrors.events, [.completed(30)])
XCTAssertEqual(dataLoader.pages, [1])
}
func testRefresh() {
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())])
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 1])
}
func testRefreshNewLoadEmpty() {
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())])
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadPageThenEmpty(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 1])
}
func testNextPage() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 2])
}
func testNextPageThenRefresh() {
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())])
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6]), .next(50, [1, 2, 3])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 2, 1])
}
func testNextPageBeforeInitialPage() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let loadNextPageTrigger = testScheduler.createColdObservable([.next(5, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(15, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(15, [1, 2, 3, 4, 5, 6])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 2])
}
func testNextPageComesBeforeInitialPage() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let loadNextPageTrigger = testScheduler.createColdObservable([.next(3, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.reverseLoadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(8, [4, 5, 6]), .next(10, [1, 2, 3, 4, 5, 6])])
XCTAssertTrue(error.events.isEmpty)
XCTAssertEqual(dataLoader.pages, [1, 2])
}
func testLoadError() {
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]())
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.errorLoadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]())])
XCTAssertEqual(error.events.count, 1)
XCTAssertEqual(dataLoader.pages, [1])
}
func testErrorThenRetry() {
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())])
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())])
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let sink = PaginationSink(ui: source, loadData: dataLoader.errorThenSuccessLoadData(page:))
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
testScheduler.start()
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(50, [1, 2, 3])])
XCTAssertEqual(error.events.map { $0.time }, [30])
XCTAssertEqual(dataLoader.pages, [1, 2, 1])
}
}
class DataLoader {
private (set) var pages: [Int] = []
private let testScheduler: TestScheduler
init(testScheduler: TestScheduler) {
self.testScheduler = testScheduler
}
func loadEmpty(page: Int) -> Observable<[Int]> {
pages.append(page)
return self.testScheduler.createColdObservable([.next(10, []), .completed(10)])
.asObservable()
}
func loadData(page: Int) -> Observable<[Int]> {
pages.append(page)
let result = Array((0..<3).map { page * 3 - (2 - $0) })
return self.testScheduler.createColdObservable([.next(10, result), .completed(10)])
.asObservable()
}
func reverseLoadData(page: Int) -> Observable<[Int]> {
pages.append(page)
let result = Array((0..<3).map { page * 3 - (2 - $0) })
let time = 15 - page * 5
return self.testScheduler.createColdObservable([.next(time, result), .completed(time)])
.asObservable()
}
func errorLoadData(page: Int) -> Observable<[Int]> {
pages.append(page)
let error = NSError(domain: "testing", code: -1, userInfo: nil)
return self.testScheduler.createColdObservable([Recorded<Event<[Int]>>.error(10, error)])
.asObservable()
}
func loadPageThenEmpty(page: Int) -> Observable<[Int]> {
if pages.count == 0 {
return loadData(page: page)
}
else {
return loadEmpty(page: page)
}
}
func errorThenSuccessLoadData(page: Int) -> Observable<[Int]> {
if pages.count == 1 {
return errorLoadData(page: page)
}
else {
return loadData(page: page)
}
}
}
@Herakleis

This comment has been minimized.

Copy link

@Herakleis Herakleis commented May 7, 2017

Hey @DTartaglia, just so I understand: your PaginationUISource is a protocol that typically a VC would implement so that its DisposeBag is passed to the PaginationSink (which acts as a "viewModel"); this way, only one DisposeBag is active and disposals will be chained if the VC gets deallocated right?

@danielt1263

This comment has been minimized.

Copy link
Owner Author

@danielt1263 danielt1263 commented Sep 26, 2018

@Herakleis Yes, that is correct. When I wrote this code, my standard practice was to have the view controller conform to the source protocol.

@pakirby1

This comment has been minimized.

Copy link

@pakirby1 pakirby1 commented May 18, 2019

I attempted to add PaginationNetworkLogic.swift to the Sources folder of a playground (XCode 10.1) but it threw a compilation error on line 63: Value of type 'Observable<[Int : [T]]>' has no member 'bind'. I had to add

import RxCocoa

to get it to compile properly. Should the gist get updated?

Looks like bind() is defined as an extension on ObservableType in RxCocoa but bind() is not defined in RxSwift

https://github.com/ReactiveX/RxSwift/blob/master/RxCocoa/Common/Observable%2BBind.swift

@danielt1263

This comment has been minimized.

Copy link
Owner Author

@danielt1263 danielt1263 commented May 18, 2019

Thanks @pakirby1 I fixed it by using subscribe instead of bind and made some other edits to update the code to my current style.

@HugoSay

This comment has been minimized.

Copy link

@HugoSay HugoSay commented Oct 2, 2019

Thanks for this snippet 🙂 (had an issue with it but I found it came from my implementation :P)

@mesheilah

This comment has been minimized.

Copy link

@mesheilah mesheilah commented Dec 12, 2019

I created a view controller to test your gist with reqres public api, i just wanted to call their api without paging to make sure the gist is calling the first page correctly, but it emits an empty array first and then calls the service, here is what I've tried:

(1) I changed your pagination source UI struct to protocol so that I can re-use it easily in any view controller:

protocol PaginationUISource: class {
    /// reloads first page and dumps all other cached pages.
    var refresh: Observable<Void> {get}
    /// loads next page
    var loadNextPage: Observable<Void> {get}
}

(2) I used the following two methods in my view controller:

func getData(forPage: Int) -> Observable<[UserModel]> {
        let request =  APIRequest() //here i disregard the page to test only a regular call
        let ob: Observable<[UserModel]> = self.apiCalling.send(apiRequest: request)
        return ob
    }
    
    @IBAction func getData (_ sender: Any) {
        let sink = PaginationSink(ui: self, loadData: getData(forPage:))
        sink
            .elements
            .debug("ELEMENTS")
            .subscribe{ _ in}
            .disposed(by: disposeBag)
        
        sink
            .isLoading
            .debug("LOADING")
            .subscribe { _ in }
            .disposed(by: disposeBag)
    }

output:

ELEMENTS -> subscribed
ELEMENTS -> Event next([]) <==== this empty array is emitted due to using a BehaviorSubject for loadResults
LOADING -> subscribed
LOADING -> Event next(true)
ELEMENTS -> Event next([ .... Array of data ... ])
LOADING -> Event next(false)

two concerns here:
(1) using a BehaviorSubject for loadResults var will emit empty array in the output before calling the service, I've tried to replace it with a PublishSubject but no elements are emitted because PublishSubject won't keep the last value, I don't have a clue on how to get rid of this empty array. Giving that I won't use skip(1) in the view controller to skip this empty array because this is not a clean solution.

(2) is there a way to trigger calling the API using a Void observer (trigger) instead of calling it at init ? like a button.rx.tap trigger.

@danielt1263

This comment has been minimized.

Copy link
Owner Author

@danielt1263 danielt1263 commented Dec 12, 2019

There is no need to change the PaginationUISource to a protocol. All you need to do is create an instance of PaginationUISource and pass it in. If you like doing the extra work to conform to the protocol, you are welcome to do it, but it is not necessary. Just like it's not necessary to turn Int into a protocol in order to pass an integer into a function.

Regarding your concerns:
(1) You can remove the initial empty array emission by adding .skip(1) between lines 65 and 66.
(2) Yes there's a way to avoid calling the api service on init... Remove the Observable.just(1) bit from the merge on line 43.

@mesheilah

This comment has been minimized.

Copy link

@mesheilah mesheilah commented Dec 15, 2019

Thanks Daniel, I got a tiny improvement: replace lines 70-71 with compactMap{ $0 }.

at line 58, you have to dispose the subscription, why did you just assign it to an underscore ?

Anyway, If you want to make your pagination sink perfect, there is a scenario in which the isLoading observable emits wrong value, that happens when the loadData function returns immediate result without having to wait a period of time, say when retrieving data from a cached URLSession the result returns immediately as opposed to a network call, isLoading at first call emits true and doesn't emit a value for subsequent calls. Adding something like: .delay(.milliseconds(1), scheduler: MainScheduler.instance) for the URLSession's observable will make isLoading works as expected. I don't have a clue on how to change lines 60-63 to handle this scenario, but it's not a big issue as it's a tiny bug that won't happen in most scenarios.

@mesheilah

This comment has been minimized.

Copy link

@mesheilah mesheilah commented Jan 14, 2020

what's the best way to return the current page number to the subscriber ?

@danielt1263

This comment has been minimized.

Copy link
Owner Author

@danielt1263 danielt1263 commented Jan 14, 2020

@mesheilah I think if I actually wanted individual pages, I would have the let elements: Observable<[T]> be a let elements: Observable<[[T]]> instead and return each page as a separate sub-array. The number of pages would be elements.map { $0.count }.

@mesheilah

This comment has been minimized.

Copy link

@mesheilah mesheilah commented Jan 15, 2020

I meant by current page is the page that is currently loading, I thought it would be better to use isLoading as Observable<(Bool, Int)> to hold the current page number being loaded. but can't figure out where to glue the page number with isLoading.

@mesheilah

This comment has been minimized.

Copy link

@mesheilah mesheilah commented Jan 28, 2020

The following is a Combine version I've written for this gist today:

import Foundation
import Combine

struct PaginationUISource {
    /// reloads first page and dumps all other cached pages.
    let refresh: AnyPublisher<Void, Error>
    /// loads next page
    let loadNextPage: AnyPublisher<Void, Error>
}

struct PaginationSink<T> {
    fileprivate var subscriptions = Set<AnyCancellable>()
    /// true if network loading is in progress.
    let isLoading: AnyPublisher<Bool, Error>
    /// elements from all loaded pages
    let elements: AnyPublisher<[T], Error>
}

extension PaginationSink {
    init(ui: PaginationUISource, loadData: @escaping (Int) -> AnyPublisher<[T], Error>) {
        let loadResults = CurrentValueSubject<[Int: [T]], Error>([:])
        
        let maxPage = loadResults
            .map { $0.keys }
            .map{ $0.max() ?? 1 }
        
        let reload = ui.refresh
            .map{ -1 }
                
        let loadNext = ui.loadNextPage
        .withLatestFrom(maxPage)
            .map { $0 + 1 }
        
        
        let start = Publishers.Merge(reload, loadNext)
        
        let page = start
            .flatMap { page in
                Just(page)
                    .setFailureType(to: Error.self)
                    .combineLatest(loadData(page == -1 ? 1 : page)) { (pageNumber: $0, items: $1) }
        }
        .share()
                
         page
        .filter { !$0.items.isEmpty }
            .withLatestFrom(loadResults) { (pages: $1, newPage: $0) }
            .map { $0.newPage.pageNumber == -1 ? [1: $0.newPage.items] : $0.pages.merging([$0.newPage], uniquingKeysWith: { $1 }) }
            .print("LoadResults") // <--- at this point loadResults can't get filled with pages
            .subscribe(loadResults)
            .store(in: &subscriptions)
        
        let _isLoading = Publishers.Merge(start.map { _ in 1 }, page.map { _ in -1 })
        .scan(0, +)
        .map { $0 > 0 }
        .removeDuplicates()
        
        
        let _elements = loadResults
            .map { $0.sorted(by: { $0.key < $1.key }).flatMap { $0.value } }
        
        isLoading = _isLoading.eraseToAnyPublisher()
        elements = _elements.eraseToAnyPublisher()
    }
}

I have used Shai's gist to use withLatestFrom custom operator with Combine which's supposed to be working fine after testing it with multiple scenarios. However, the issue I'm facing is that loadResults subject doesn't get filled with new pages, I have placed a print("LoadResults") operator as appears in my code above and the console prints the following output:

LoadResults: receive subscription: ((extension in TestSwiftUI):Combine.Publishers.WithLatestFrom<Combine.Publishers.Filter<Comb ... blah blah blah
LoadResults: request unlimited

as you see above, there is no LoadResults: receive value entry in the log, which means that loadResults subject is not filled with pages.
what's causing that weird issue ?

@mesheilah

This comment has been minimized.

Copy link

@mesheilah mesheilah commented Jan 29, 2020

Regarding Combine version above, it has no issues at all, the issue I was facing was because the refreshSubject didn't call send in the consumer class (or the view model) to trigger fetching the first page.

@alielsokary

This comment has been minimized.

Copy link

@alielsokary alielsokary commented Mar 16, 2020

Hi Daniel,
First, I want to personally thank you for this approach.
I have a question.
I'm trying to use the paginationSink like so:

let source = PaginationUISource(refresh: _refreshTrigger, loadNextPage: _nextPageTrigger)
let sink = PaginationSink(ui: source, loadData: service.getNextData(page:))

Where getNextData is
func getNextData(page: Int) -> Observable<[TheData]>

This works fine but I want to add a parameter to getNextData like so getNextData(category: String, page: Int) -> Observable<[TheData]>
and use it like so:
let sink = PaginationSink(ui: source, loadData: service.getNextData(section: "home", page:1))

I get Cannot convert value of type 'Observable<[TheData]>' to expected argument type '(Int) -> Observable<[_]>'

I understand the error is because of the PaginationSink init loadData: expects a closure of (Int) -> Observable<[T]>
I know that the solution for this maybe something silly but I'm not sure what is next step here.
Thank you

@danielt1263

This comment has been minimized.

Copy link
Owner Author

@danielt1263 danielt1263 commented Mar 16, 2020

@alielsokary Try something like:

func getNextData(service: Service, category: String) -> (_ page: Int) -> Observable<[TheData]> {
    return { page in 
        service.getNextData(category: category, page: page)
    }
}

The function above will return a function you can pass to the PaginationSink.
let sink = PaginationSink(ui: source, loadData: getNextData(service: service, category: "home"))

@alielsokary

This comment has been minimized.

Copy link

@alielsokary alielsokary commented Mar 16, 2020

Hi Daniel,
Thank you for your quick response. are you sure service.getNextData(section: section, page: page) is correct?. because page is not a parameter in getNextData and when I tried this in my code I got an error Extra argument 'page' in the call. also the getNextData(section: section, page: page)expects argumentservice`

@danielt1263

This comment has been minimized.

Copy link
Owner Author

@danielt1263 danielt1263 commented Mar 16, 2020

@alielsokary I don't know. That's your function. I see that I used section where your function has category so I changed that in the above. The idea here is to make a factory function that takes the service and category as parameters and returns a function that takes the page number as a parameter, then you can use all three parameters to call your function.

@alielsokary

This comment has been minimized.

Copy link

@alielsokary alielsokary commented Mar 17, 2020

It worked!
Thank you so much Daniel for your quick response and support :)

@Shadester

This comment has been minimized.

Copy link

@Shadester Shadester commented Aug 4, 2020

Would this work for a pagination with cursors?

@markst

This comment has been minimized.

Copy link

@markst markst commented Aug 23, 2020

@danielt1263 thanks again for a great sink! any chance of this being made into a pod?

I'm having trouble where by the pagination.isLoading stops emitting events.
I've simplified the reproducible state code as example:

let pagination = PaginationSink<Show>(ui: source, loadData: { [unowned showsFilter] page in
  if let filter = showsFilter.value, filter.isEmpty == false {
    return Single<[Show]>
      .create(subscribe: { (single) -> Disposable in
        DispatchQueue.global(qos: .userInitiated).async {
          single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
        }
        return Disposables.create()
      })
      .asObservable()
  } else {
    return Single<[Show]>
      .create(subscribe: { (single) -> Disposable in
        single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
        return Disposables.create()
      })
      .asObservable()
  }
})

After returning filter Single, no 'loading' events are ever emitted again regardless of filter value.
What resolves the above chunk of code is starting with empty array .asObservable().startWith([])


I've tried to create a test to reflect my issue, but not so familiar with RxTest framework to produce appropriate events.
Best I can do for now:

func testLoadingEventsError() {
  
  let showsFilter = BehaviorRelay<String?>(value: nil)
  let refreshRelay = PublishRelay<Void>.init()

  let refreshTrigger = Observable.merge(showsFilter.skip(1).map({ _ in }), refreshRelay.asObservable())
  let loadNextPageTrigger = Observable<Void>.empty()
  let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())

  let expectationOne = XCTestExpectation(description: "Fetch filter")
  let expectationTwo = XCTestExpectation(description: "Fetch reload")
  
  let sink = PaginationSink(ui: source) { [unowned showsFilter] (page) -> Observable<[Int]> in
    if let filter = showsFilter.value, filter.isEmpty == false {
      return Single<[Int]>
        .create(subscribe: { (single) -> Disposable in
          DispatchQueue.global(qos: .userInitiated)
            .asyncAfter(deadline: .now() + 5, execute: {
              expectationOne.fulfill()
              single(.success(filter.map({ Int(String($0))! }) ))
            })
          return Disposables.create()
        })
        .asObservable()
    } else {
      return Single<[Int]>
        .create(subscribe: { (single) -> Disposable in
          DispatchQueue.global(qos: .userInitiated)
            .asyncAfter(deadline: .now() + 5, execute: {
              expectationTwo.fulfill()
              single(.success([]))
            })
          return Disposables.create()
        })
        .asObservable()
    }
  }
  
  bag.insert(
    sink.isLoading.subscribe(isLoading),
    sink.elements.subscribe(elements),
    sink.error.subscribe(error)
  )
  
  showsFilter.accept("123")
  refreshRelay.accept(())

  wait(for: [expectationOne, expectationTwo], timeout: 15.0)
  XCTAssertEqual(isLoading.events, [.next(0, true), .next(0, false), .next(0, true), .next(0, false)])
  XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(0, [1, 2, 3]), .next(0, [1, 2, 3])])
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment