Skip to content

Instantly share code, notes, and snippets.

@NikolaiRuhe
Created December 28, 2021 20:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NikolaiRuhe/ab87c54616fb5a078d6a9134a5df67fe to your computer and use it in GitHub Desktop.
Save NikolaiRuhe/ab87c54616fb5a078d6a9134a5df67fe to your computer and use it in GitHub Desktop.
import Foundation
extension AsyncSequence {
/// Returns another `AsyncSequence` that fetches elements in the background
/// from the base sequence and buffers them for faster access.
public func buffer(limit: Int = 1) -> AsyncBuffer<Self> {
.init(self, limit: limit)
}
}
/// An `AsyncBuffer` that fetches elements from a base sequence in the
/// background and buffers them for faster access when using back pressure.
public struct AsyncBuffer<Base>: AsyncSequence where Base: AsyncSequence {
public typealias Element = Base.Element
private let base: Base
private let limit: Int
public init(_ base: Base, limit: Int) {
self.base = base
self.limit = limit
}
public func makeAsyncIterator() -> Iterator<Base.AsyncIterator> {
.init(base.makeAsyncIterator(), limit: limit)
}
/// `AsyncBuffer.Iterator` implements the functionality of `AsyncBuffer` for
/// a single pass through the base sequence.
public class Iterator<BaseIterator: AsyncIteratorProtocol>: AsyncIteratorProtocol where BaseIterator.Element == Element {
internal typealias RequestPipe = AsyncStream<Void>.Pipe
internal typealias ElementPipe = AsyncThrowingStream<Element, Error>.Pipe
private var bufferedElements: ElementPipe.Out
private var requestInput: RequestPipe.In
private var task: Task<Void, Never>
public init(_ baseIterator: BaseIterator, limit: Int) {
let elementPipe = ElementPipe()
self.bufferedElements = elementPipe.out
let requestPipe = RequestPipe()
self.requestInput = requestPipe.in
self.task = Task { [requests = requestPipe.out, bufferInput = elementPipe.in, baseIterator = baseIterator] in
var backgroundBuffer = Buffer(
baseIterator: baseIterator,
upperLimit: limit,
bufferInput: bufferInput,
requests: requests)
try? await backgroundBuffer.mainLoop()
}
}
deinit {
requestInput.finish()
task.cancel()
}
public func next() async throws -> Base.Element? {
requestInput.yield()
let element = try await bufferedElements.next()
return element
}
}
}
private extension AsyncBuffer.Iterator {
struct Buffer {
var baseIterator: BaseIterator
var baseIteratorIsExhausted = false
let upperLimit: Int
var bufferedElementCount = 0
var bufferInput: ElementPipe.In
var requests: RequestPipe.Out
init(baseIterator: BaseIterator, upperLimit: Int, bufferInput: ElementPipe.In, requests: RequestPipe.Out) {
self.baseIterator = baseIterator
self.upperLimit = upperLimit
self.bufferInput = bufferInput
self.requests = requests
}
mutating func mainLoop() async throws {
while !baseIteratorIsExhausted {
try Task.checkCancellation()
await nextElementNeeded()
try Task.checkCancellation()
await deliverNextElement()
}
}
mutating func nextElementNeeded() async {
while bufferedElementCount >= upperLimit {
await requests.next()
bufferedElementCount -= 1
}
}
mutating func deliverNextElement() async {
do {
if let element = try await baseIterator.next() {
bufferedElementCount += 1
bufferInput.yield(element)
} else {
bufferInput.finish()
baseIteratorIsExhausted = true
}
} catch {
bufferInput.finish(throwing: error)
baseIteratorIsExhausted = true
}
}
}
}
public extension AsyncThrowingStream where Failure == Error {
struct Pipe {
public typealias In = Continuation
public typealias Out = Iterator
public let `in`: In
public let out: Out
public init() {
var continuation: In? = nil
let stream = AsyncThrowingStream {
continuation = $0
}
self.out = stream.makeAsyncIterator()
self.in = continuation!
}
}
}
public extension AsyncStream {
struct Pipe {
public typealias In = Continuation
public typealias Out = Iterator
public let `in`: In
public let out: Out
public init() {
var continuation: In? = nil
let stream = AsyncStream {
continuation = $0
}
self.out = stream.makeAsyncIterator()
self.in = continuation!
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment