Skip to content

Instantly share code, notes, and snippets.

@wotjd
Last active September 2, 2022 11:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wotjd/22e9488941838ca421a6899d19bfca24 to your computer and use it in GitHub Desktop.
Save wotjd/22e9488941838ca421a6899d19bfca24 to your computer and use it in GitHub Desktop.
some utility operators for AsyncStream inspired by RxSwift
import Foundation
extension AsyncStream {
init(
_ elementType: Element.Type = Element.self,
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
_ build: @escaping (AsyncStream<Element>.Continuation) async -> Void
) {
self = AsyncStream(elementType, bufferingPolicy: limit) { continuation in
Task { await build(continuation) }
}
}
}
extension AsyncStream.Continuation {
public func finish(with value: Element) {
self.yield(value)
self.finish()
}
}
extension AsyncStream {
static func empty<Element>() -> AsyncStream<Element> {
.init { $0.finish() }
}
static func just<Element>(_ element: Element) -> AsyncStream<Element> {
.init { $0.finish(with: element) }
}
static func of<Element>(_ elements: Element...) -> AsyncStream<Element> {
.init { continuation in
elements.forEach { continuation.yield($0) }
continuation.finish()
}
}
static func merge<Element>(_ streams: AsyncStream<Element>...) -> AsyncStream<Element> {
.init { continuation in
let tasks = streams.reduce(into: [Task<Void, Never>]()) { tasks, stream in
tasks.append(
Task {
for await element in stream {
continuation.yield(element)
}
}
)
}
for task in tasks {
await task.value
}
continuation.finish()
}
}
static func concat<Element>(_ streams: AsyncStream<Element>...) -> AsyncStream<Element> {
.init { continuation in
for stream in streams {
for await element in stream {
continuation.yield(element)
}
}
continuation.finish()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment