Skip to content

Instantly share code, notes, and snippets.

@0xR
Last active November 13, 2019 14:50
Show Gist options
  • Save 0xR/091849b8bb8d935d7343416f52d72a3e to your computer and use it in GitHub Desktop.
Save 0xR/091849b8bb8d935d7343416f52d72a3e to your computer and use it in GitHub Desktop.
Vert.X streams with Reactor
fun <Input, Output> processWithReactor(
vertx: Vertx,
inputStream: ReadStream<Input>,
outputStream: WriteStream<Output>,
processor: (Flux<Input>) -> Flux<Output>
) {
val publisher = ReactiveWriteStream.writeStream<Input>(vertx)
inputStream.pipeTo(publisher)
val writeStreamFlux = Flux.from(publisher)
val processedFlux = processor(writeStreamFlux)
val subscriber = ReactiveReadStream.readStream<Output>()
processedFlux.subscribe(subscriber)
subscriber.pipeTo(outputStream)
}
// Use the helper in a Vert.X handler
processWithReactor(
vertx,
inputStream = websocketStream,
outputStream = eventBusPublisher
) { socketFlux ->
socketFlux
.flatMap { /* do things*/ }
.buffer(10)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment