Skip to content

Instantly share code, notes, and snippets.

@bgromov
Last active March 30, 2021 06:50
Show Gist options
  • Save bgromov/1cc84a62e5ac2363c4583e8b69ecac0a to your computer and use it in GitHub Desktop.
Save bgromov/1cc84a62e5ac2363c4583e8b69ecac0a to your computer and use it in GitHub Desktop.
RingBuffer publisher for Apple's Combine framework

RingBuffer Publisher

Implementation of a ring buffer publisher for Apple's Combine framework.

The publisher produces the first output based on a specified strategy (defaults to .always). See examples below.

Examples

import Foundation
import Combine

let pub = (0..<10).publisher
  .ringBuffer(size: 3)
  .sink { print($0) }

The above snippet produces the following output:

[0]
[0, 1]
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
[6, 7, 8]
[7, 8, 9]

Alternatively, when strategy is set to .whenFull:

import Foundation
import Combine

let pub = (0..<10).publisher
    .ringBuffer(size: 3, strategy: .whenFull)
    .sink { print($0) }
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
[6, 7, 8]
[7, 8, 9]

Credits

The underlying RingBuffer structure used in this implementation is taken from Ray Wenderlich's Swift Algorithm Club and is written by Matthijs Hollemans.

Useful Links

/*
Source: https://github.com/raywenderlich/swift-algorithm-club/tree/dd1ed39fca150d4fa2905b902736f12a49f3efb1/Ring%20Buffer
Fixed-length ring buffer
In this implementation, the read and write pointers always increment and
never wrap around. On a 64-bit platform that should not get you into trouble
any time soon.
Not thread-safe, so don't read and write from different threads at the same
time! To make this thread-safe for one reader and one writer, it should be
enough to change read/writeIndex += 1 to OSAtomicIncrement64(), but I haven't
tested this...
*/
public struct RingBuffer<T> {
private var array: [T?]
private var readIndex = 0
private var writeIndex = 0
public let size: Int
public init(count: Int) {
size = count
array = [T?](repeating: nil, count: count)
}
/* Returns false if out of space. */
@discardableResult
public mutating func write(_ element: T) -> Bool {
guard !isFull else { return false }
defer {
writeIndex += 1
}
array[wrapped: writeIndex] = element
return true
}
/* Returns nil if the buffer is empty. */
public mutating func read() -> T? {
guard !isEmpty else { return nil }
defer {
array[wrapped: readIndex] = nil
readIndex += 1
}
return array[wrapped: readIndex]
}
public var availableSpaceForReading: Int {
return writeIndex - readIndex
}
public var isEmpty: Bool {
return availableSpaceForReading == 0
}
public var availableSpaceForWriting: Int {
return array.count - availableSpaceForReading
}
public var isFull: Bool {
return availableSpaceForWriting == 0
}
}
extension RingBuffer: Sequence {
public func makeIterator() -> AnyIterator<T> {
var index = readIndex
return AnyIterator {
guard index < self.writeIndex else { return nil }
defer {
index += 1
}
return self.array[wrapped: index]
}
}
}
private extension Array {
subscript (wrapped index: Int) -> Element {
get {
return self[index % count]
}
set {
self[index % count] = newValue
}
}
}
import Foundation
import Combine
public extension Publisher {
/// A publisher that buffers elements from an upstream publisher in a ring buffer.
/// - Parameters:
/// - size: buffer size
/// - strategy: when set to `.always` (default) generates output with first upstream element; when set to `.whenFull` generates output after the buffer is full
func ringBuffer(size: Int, strategy: Publishers.RingBuffer<Self>.OutputStrategy = .always) -> Publishers.RingBuffer<Self> {
return Publishers.RingBuffer(upstream: self, size: size, strategy: strategy)
}
}
public extension Publishers {
/// A publisher that buffers elements from an upstream publisher in a ring buffer.
struct RingBuffer<Upstream> : Publisher where Upstream : Publisher {
/// The kind of values published by this publisher.
public typealias Output = [Upstream.Output]
/// The kind of errors this publisher might publish.
///
/// Use `Never` if this `Publisher` does not publish errors.
public typealias Failure = Upstream.Failure
/// The publisher from which this publisher receives elements.
public let upstream: Upstream
/// The maximum number of elements to store.
public let size: Int
/// Output strategy
public let strategy: OutputStrategy
public init(upstream: Upstream, size: Int, strategy: OutputStrategy) {
self.upstream = upstream
self.size = size
self.strategy = strategy
}
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Downstream.Input == Output, Downstream.Failure == Failure
{
upstream.subscribe(Inner(downstream: subscriber, size: size, strategy: strategy))
}
}
}
public extension Publishers.RingBuffer {
enum OutputStrategy {
case always
case whenFull
}
}
extension Publishers.RingBuffer {
private final class Inner<Downstream: Subscriber> : Subscriber where Downstream.Input == Output, Downstream.Failure == Upstream.Failure {
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
let downstream: Downstream
let strategy: OutputStrategy
var buf: RingBuffer<Input>
init(downstream: Downstream, size: Int, strategy: OutputStrategy) {
self.downstream = downstream
self.strategy = strategy
self.buf = RingBuffer<Input>(count: size)
}
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
func receive(_ input: Upstream.Output) -> Subscribers.Demand {
buf.write(input)
let output = Array(buf)
if strategy == .always {
_ = downstream.receive(output)
}
if buf.isFull {
if strategy == .whenFull {
_ = downstream.receive(output)
}
// Drop oldest
_ = buf.read()
}
return .max(buf.availableSpaceForWriting)
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
downstream.receive(completion: completion)
}
}
}
@richard-clements
Copy link

richard-clements commented Mar 30, 2021

Hey, I came across this and just wondered if the same could be achieved using just Scan?

extension Publisher {
    
    func ringBuffer(capacity: Int, fireOnlyWhenFull: Bool) -> AnyPublisher<[Output], Failure> {
        scan([Output]()) {
            var buffer = Array($0.suffix(capacity))
            buffer.append($1)
            return buffer
        }
        .filter { fireOnlyWhenFull ? $0.count >= capacity : true }
        .eraseToAnyPublisher()
    }
    
}

Just wondered if there was any downsides to this approach you can see?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment