Created
December 28, 2021 20:22
-
-
Save NikolaiRuhe/ab87c54616fb5a078d6a9134a5df67fe to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
extension AsyncSequence { | |
/// Returns another `AsyncSequence` that fetches elements in the background | |
/// from the base sequence and buffers them for faster access. | |
public func buffer(limit: Int = 1) -> AsyncBuffer<Self> { | |
.init(self, limit: limit) | |
} | |
} | |
/// An `AsyncBuffer` that fetches elements from a base sequence in the | |
/// background and buffers them for faster access when using back pressure. | |
public struct AsyncBuffer<Base>: AsyncSequence where Base: AsyncSequence { | |
public typealias Element = Base.Element | |
private let base: Base | |
private let limit: Int | |
public init(_ base: Base, limit: Int) { | |
self.base = base | |
self.limit = limit | |
} | |
public func makeAsyncIterator() -> Iterator<Base.AsyncIterator> { | |
.init(base.makeAsyncIterator(), limit: limit) | |
} | |
/// `AsyncBuffer.Iterator` implements the functionality of `AsyncBuffer` for | |
/// a single pass through the base sequence. | |
public class Iterator<BaseIterator: AsyncIteratorProtocol>: AsyncIteratorProtocol where BaseIterator.Element == Element { | |
internal typealias RequestPipe = AsyncStream<Void>.Pipe | |
internal typealias ElementPipe = AsyncThrowingStream<Element, Error>.Pipe | |
private var bufferedElements: ElementPipe.Out | |
private var requestInput: RequestPipe.In | |
private var task: Task<Void, Never> | |
public init(_ baseIterator: BaseIterator, limit: Int) { | |
let elementPipe = ElementPipe() | |
self.bufferedElements = elementPipe.out | |
let requestPipe = RequestPipe() | |
self.requestInput = requestPipe.in | |
self.task = Task { [requests = requestPipe.out, bufferInput = elementPipe.in, baseIterator = baseIterator] in | |
var backgroundBuffer = Buffer( | |
baseIterator: baseIterator, | |
upperLimit: limit, | |
bufferInput: bufferInput, | |
requests: requests) | |
try? await backgroundBuffer.mainLoop() | |
} | |
} | |
deinit { | |
requestInput.finish() | |
task.cancel() | |
} | |
public func next() async throws -> Base.Element? { | |
requestInput.yield() | |
let element = try await bufferedElements.next() | |
return element | |
} | |
} | |
} | |
private extension AsyncBuffer.Iterator { | |
struct Buffer { | |
var baseIterator: BaseIterator | |
var baseIteratorIsExhausted = false | |
let upperLimit: Int | |
var bufferedElementCount = 0 | |
var bufferInput: ElementPipe.In | |
var requests: RequestPipe.Out | |
init(baseIterator: BaseIterator, upperLimit: Int, bufferInput: ElementPipe.In, requests: RequestPipe.Out) { | |
self.baseIterator = baseIterator | |
self.upperLimit = upperLimit | |
self.bufferInput = bufferInput | |
self.requests = requests | |
} | |
mutating func mainLoop() async throws { | |
while !baseIteratorIsExhausted { | |
try Task.checkCancellation() | |
await nextElementNeeded() | |
try Task.checkCancellation() | |
await deliverNextElement() | |
} | |
} | |
mutating func nextElementNeeded() async { | |
while bufferedElementCount >= upperLimit { | |
await requests.next() | |
bufferedElementCount -= 1 | |
} | |
} | |
mutating func deliverNextElement() async { | |
do { | |
if let element = try await baseIterator.next() { | |
bufferedElementCount += 1 | |
bufferInput.yield(element) | |
} else { | |
bufferInput.finish() | |
baseIteratorIsExhausted = true | |
} | |
} catch { | |
bufferInput.finish(throwing: error) | |
baseIteratorIsExhausted = true | |
} | |
} | |
} | |
} | |
public extension AsyncThrowingStream where Failure == Error { | |
struct Pipe { | |
public typealias In = Continuation | |
public typealias Out = Iterator | |
public let `in`: In | |
public let out: Out | |
public init() { | |
var continuation: In? = nil | |
let stream = AsyncThrowingStream { | |
continuation = $0 | |
} | |
self.out = stream.makeAsyncIterator() | |
self.in = continuation! | |
} | |
} | |
} | |
public extension AsyncStream { | |
struct Pipe { | |
public typealias In = Continuation | |
public typealias Out = Iterator | |
public let `in`: In | |
public let out: Out | |
public init() { | |
var continuation: In? = nil | |
let stream = AsyncStream { | |
continuation = $0 | |
} | |
self.out = stream.makeAsyncIterator() | |
self.in = continuation! | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment