Skip to content

Instantly share code, notes, and snippets.

@rhart
Created August 2, 2014 23:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rhart/b4316b43b8323e32cf9e to your computer and use it in GitHub Desktop.
Save rhart/b4316b43b8323e32cf9e to your computer and use it in GitHub Desktop.
Ratpack publishing to websocket
public class MetricsWebsocketBroadcastHandler implements Handler {
@Override
public void handle(final Context context) throws Exception {
final MetricsPublisher publisher = context.get(MetricsPublisher.class);
websocket(context, new AutoCloseWebSocketHandler<AutoCloseable>() {
@Override
public AutoCloseable onOpen(final WebSocket webSocket) throws Exception {
MetricsWebsocketSubscriber subscriber = new MetricsWebsocketSubscriber(webSocket);
context.stream(publisher, subscriber);
return subscriber;
}
});
}
}
public class MetricsWebsocketSubscriber implements Subscriber<String>, AutoCloseable {
private final WebSocket webSocket;
private Subscription subscription;
public MetricsWebsocketSubscriber(WebSocket webSocket) {
this.webSocket = webSocket;
}
@Override
public void close() {
if (subscription != null) {
this.subscription.cancel();
}
}
@Override
public void onSubscribe(Subscription s) {
if (this.subscription == null) {
this.subscription = s;
this.subscription.request(Integer.MAX_VALUE);
} else {
this.subscription.cancel();
}
}
@Override
public void onNext(String s) {
webSocket.send(s);
}
@Override
public void onError(Throwable t) {
webSocket.close();
}
@Override
public void onComplete() {
webSocket.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment