Skip to content

Instantly share code, notes, and snippets.

@yesleon
Created March 10, 2022 05:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yesleon/b0b5653bc8846bef18bf152730e1b47c to your computer and use it in GitHub Desktop.
Save yesleon/b0b5653bc8846bef18bf152730e1b47c to your computer and use it in GitHub Desktop.
import Combine
precedencegroup SinkPrecedence { }
precedencegroup FilterPrecedence {
associativity: left
higherThan: SinkPrecedence
}
infix operator | : FilterPrecedence
infix operator > : SinkPrecedence
infix operator &> : SinkPrecedence
extension Publisher {
static func | <U>(lhs: Self, rhs: @escaping (Output) async throws -> U) -> some Publisher {
lhs.mapError { $0 as Error }
.flatMap { value in
Future<U, Error> { promise in
Task {
do {
promise(.success(try await rhs(value)))
} catch {
promise(.failure(error))
}
}
}
}
}
static func > (lhs: Self, rhs: @escaping (Output) -> Void) -> some Cancellable {
var sub: AnyCancellable?
let pipeline = lhs.sink { _ in
sub = nil
} receiveValue: {
rhs($0)
}
sub = pipeline
return pipeline
}
static func &> (lhs: Self, rhs: @escaping (Result<Output, Error>) -> Void) -> some Cancellable {
var sub: AnyCancellable?
let pipeline = lhs.sink {
if case .failure(let error) = $0 {
rhs(.failure(error))
}
sub = nil
} receiveValue: {
rhs(.success($0))
}
sub = pipeline
return pipeline
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment