Last active
April 29, 2024 06:17
-
-
Save danielt1263/2b624d7c925d8b7910ef2f1e5afe177b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// StallUnless.swift | |
// | |
// Created by Daniel Tartaglia on 1 Oct 2018. | |
// Copyright © 2024 Daniel Tartaglia. MIT License. | |
// | |
import RxSwift | |
extension ObservableType { | |
/** | |
Emits values immediately if the boundary sequence last emitted true, | |
otherwise collects elements from the source sequence until the boundary | |
sequence emits true then emits the collected elements. | |
- parameter boundary: Triggering event sequence. | |
- parameter initial: The initial value of the boundary | |
- returns: An Observable sequence. | |
*/ | |
func stall<O>(unless boundary: O, initial: Bool) -> Observable<Element> where O: ObservableType, O.Element == Bool { | |
Observable.merge( | |
materialize().map(StallUnlessInput<Element>.signal), | |
boundary.materialize().map(StallUnlessInput<Element>.trigger) | |
) | |
.scan(StallUnlessState(isActive: initial), accumulator: { $0.reduce(input: $1) }) | |
.map { $0.output } | |
.dematerialize() | |
.filter { !$0.isEmpty } | |
.flatMap { Observable.from($0) } | |
} | |
} | |
private enum StallUnlessState<Element> { | |
case active(Event<[Element]>) | |
case dormant([Element]) | |
init(isActive: Bool) { | |
if isActive { | |
self = .active(.next([])) | |
} | |
else { | |
self = .dormant([]) | |
} | |
} | |
var output: Event<[Element]> { | |
guard case let .active(event) = self else { | |
return .next([]) | |
} | |
return event | |
} | |
func reduce(input: StallUnlessInput<Element>) -> StallUnlessState { | |
switch (self, input) { | |
case let (.active, .signal(.next(element))): | |
return .active(.next([element])) | |
case let (.dormant(buffer), .signal(.next(element))): | |
return .dormant(buffer + [element]) | |
case (_, .signal(.completed)): | |
return .active(.completed) | |
case (.active, .trigger(.next(true))), (.active, .trigger(.completed)): | |
return .active(.next([])) | |
case let (.dormant(buffer), .trigger(.next(true))), let (.dormant(buffer), .trigger(.completed)): | |
return .active(.next(buffer)) | |
case (.active, .trigger(.next(false))): | |
return .dormant([]) | |
case (.dormant, .trigger(.next(false))): | |
return self | |
case let (_, .trigger(.error(error))), let (_, .signal(.error(error))): | |
return .active(.error(error)) | |
} | |
} | |
} | |
private enum StallUnlessInput<Element> { | |
case signal(Event<Element>) | |
case trigger(Event<Bool>) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final class Tests: XCTestCase { | |
func test() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let a = scheduler.createObservable(timeline: "-------F----T--F--|", values: ["T": true, "F": false]) | |
let b = scheduler.createObservable(timeline: "----B---B---B---B") | |
let expected = parseTimelineEvents( "----B-------X----B", values: { ["B": ["B"], "X": ["B", "B"]][$0]! }) | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
b.stall(unless: a, initial: true) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
func test_boundary_error() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let a = scheduler.createObservable(timeline: "-------F----T--F--#", values: ["T": true, "F": false]) | |
let b = scheduler.createObservable(timeline: "----B---B---B---B") | |
let expected = parseTimelineEvents("----B-------X-----#", values: { ["B": ["B"], "X": ["B", "B"]][$0]! }) | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
b.stall(unless: a, initial: true) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
func test_complete() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let a = scheduler.createObservable(timeline: "-", values: ["T": true, "F": false]) | |
let b = scheduler.createObservable(timeline: "-|") | |
let expected = parseTimeline("-|") | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
b.stall(unless: a, initial: true) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
func test_error() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let a = scheduler.createObservable(timeline: "-", values: ["T": true, "F": false]) | |
let b = scheduler.createObservable(timeline: "-#") | |
let expected = parseTimeline("-#") | |
.offsetTime(by: 200) | |
let result = scheduler.start { | |
b.stall(unless: a, initial: true) | |
} | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment