Skip to content

Instantly share code, notes, and snippets.

@danielt1263
Last active March 2, 2020 01:10
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielt1263/f5b041facfdcdd64630e0cb8cfc2cc5b to your computer and use it in GitHub Desktop.
Save danielt1263/f5b041facfdcdd64630e0cb8cfc2cc5b to your computer and use it in GitHub Desktop.
//
// ThrottleUnlessChanged.swift
//
// Created by Daniel Tartaglia on 6 Jan 2019.
// Copyright © 2020 Daniel Tartaglia. MIT License.
//
import RxSwift
public extension ObservableType where Element: Equatable {
/**
Emits a value only if it is different than the last value emitted or more than `dueTime` has passed since the last emitted value.
- parameter dueTime: Seconds between allowed emitions of the same value.
- parameter scheduler: Sceduler to track the time on.
- returns: Elements of observable sequence.
*/
func throttleUnlessChanged(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> {
return self
.scan((value: nil as Element?, lastValue: nil as Element?, lastTime: nil as RxTime?)) { current, element in
let now = scheduler.now
if current.lastTime == nil ||
element != current.lastValue ||
now.timeIntervalSince(current.lastTime!) >= Double(dueTime)
{
return (element, element, now)
}
else {
return (nil, current.lastValue, current.lastTime)
}
}
.filter { $0.value != nil }
.map { $0.value! }
}
}
private extension Double {
init(_ interval: RxTimeInterval) {
switch interval {
case .seconds(let value):
self = Double(value)
case .milliseconds(let value):
self = Double(value) / 1000
case .microseconds(let value):
self = Double(value) / 1000000
case .nanoseconds(let value):
self = Double(value) / 1000000000
case .never:
fatalError("no time increment")
@unknown default:
fatalError("unknown time increment")
}
}
}
//
// ThrottleUnlessChangedTests.swift
//
// Created by Daniel Tartaglia on 6 Jan 2019.
// Copyright © 2020 Daniel Tartaglia. MIT License.
//
import XCTest
import RxSwift
import RxTest
class ThrottleUnlessChangedTests: XCTestCase {
var scheduler: TestScheduler!
var sink: TestableObserver<Int>!
var bag: DisposeBag!
override func setUp() {
super.setUp()
scheduler = TestScheduler(initialClock: 0)
sink = scheduler.createObserver(Int.self)
bag = DisposeBag()
}
func testDifferentValue() {
let source = scheduler.createColdObservable([
.next(10, 1),
.next(20, 2)
])
source
.throttleUnlessChanged(.seconds(30), scheduler: scheduler)
.bind(to: sink)
.disposed(by: bag)
scheduler.start()
XCTAssertEqual(sink.events, [
.next(10, 1),
.next(20, 2)
])
}
func testSameValue() {
let source = scheduler.createColdObservable([
.next(10, 1),
.next(20, 1)
])
source
.throttleUnlessChanged(.seconds(30), scheduler: scheduler)
.bind(to: sink)
.disposed(by: bag)
scheduler.start()
XCTAssertEqual(sink.events, [
.next(10, 1)
])
}
func testBorder() {
let source = scheduler.createColdObservable([
.next(10, 1),
.next(39, 1)
])
source
.throttleUnlessChanged(.seconds(30), scheduler: scheduler)
.bind(to: sink)
.disposed(by: bag)
scheduler.start()
XCTAssertEqual(sink.events, [
.next(10, 1)
])
}
func testStaggered() {
let source = scheduler.createColdObservable([
.next(10, 1),
.next(40, 1)
])
source
.throttleUnlessChanged(.seconds(30), scheduler: scheduler)
.bind(to: sink)
.disposed(by: bag)
scheduler.start()
XCTAssertEqual(sink.events, [
.next(10, 1),
.next(40, 1)
])
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment