Skip to content

Instantly share code, notes, and snippets.

@alexcmd
Created July 23, 2020 12:15
Show Gist options
  • Save alexcmd/a6acbe61b8d14c469b65af42b9910f60 to your computer and use it in GitHub Desktop.
Save alexcmd/a6acbe61b8d14c469b65af42b9910f60 to your computer and use it in GitHub Desktop.
Spring Reactive websocket example
public class StreamWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
session.getHandshakeInfo().getPrincipal().block();
var bridge = new Bridge<WebSocketMessage>();
Mono<Void> sendPings = session.send(
Flux.interval(Duration.ofMillis(1))
.map(aLong -> session.pingMessage(dataBufferFactory -> session.bufferFactory().allocateBuffer())));
Flux<WebSocketMessage> output = Flux.create(bridge::setSink, FluxSink.OverflowStrategy.DROP);
Flux<WebSocketMessage> income = session.receive().share();
Flux<String> input = income
.filter(webSocketMessage -> webSocketMessage.getType() == WebSocketMessage.Type.TEXT )
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(
s -> bridge.publish(session.textMessage("RECEIVED: "+s))
).doOnComplete(bridge::complete);
Flux<WebSocketMessage> pong = income.filter(m -> m.getType() == WebSocketMessage.Type.PONG)
.doOnNext(m -> bridge.publish(session.textMessage(LocalDateTime.now().toString())));
Mono<Void> send = session.send(output);
return Flux.merge(send, input, pong,sendPings).then();
}
private static class Bridge<T>{
private FluxSink<T> sink;
public void setSink(FluxSink<T> sink){
this.sink = sink;
}
public void publish(T message){
sink.next(message);
}
public void complete(){
sink.complete();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment