Skip to content

Instantly share code, notes, and snippets.

@beders
Created June 25, 2015 15:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save beders/03698bd891aa60ff69e8 to your computer and use it in GitHub Desktop.
Save beders/03698bd891aa60ff69e8 to your computer and use it in GitHub Desktop.
Simple EventBusBridge
package com.saffrontech.vertx;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.WebSocket;
import io.vertx.core.json.JsonObject;
import java.net.URI;
import java.util.*;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
/**
* Simple event bus bridge using Vert.x websockets.
* Note: Only Message&lt;JsonObject&gt; and Message<lt;String&gt; is supported
*
* Created by beders on 6/23/15.
*/
public class EventBusBridge {
Vertx vertx;
WebSocket webSocket;
long pingTimerID;
ConcurrentHashMap<String, List<Handler<Message<?>>>> handlers = new ConcurrentHashMap<>();
ConcurrentHashMap<String, Handler<Message<?>>> replyHandlers = new ConcurrentHashMap<>();
public static void connect(URI endPoint, Handler<EventBusBridge> onOpenHandler) {
new EventBusBridge(endPoint, onOpenHandler);
}
private EventBusBridge(URI endPoint, Handler<EventBusBridge> onOpenHandler) {
vertx = Vertx.vertx();
vertx.createHttpClient().websocket(endPoint.getPort(), endPoint.getHost(), endPoint.getPath() + "/websocket", ws -> {
webSocket = ws;
onOpenHandler.handle(EventBusBridge.this);
ws.handler(buffer -> {
bufferReceived(buffer);
});
ws.closeHandler(it -> {
if (pingTimerID != 0) {
vertx.cancelTimer(pingTimerID);
}
});
sendPing();
pingTimerID = vertx.setPeriodic(5000L, time -> {
sendPing();
});
});
}
protected void sendPing() {
//System.out.println("Sending ping");
JsonObject msg = new JsonObject().put("type","ping");
try {
webSocket.writeMessage(Buffer.buffer(msg.toString()));
} catch (IllegalStateException ise) {
vertx.cancelTimer(pingTimerID);
}
}
public EventBusBridge send(String address, String message, Handler<Message<?>> replyHandler) {
Objects.requireNonNull(webSocket);
sendMessage("send", address, message, replyHandler);
return this;
}
public EventBusBridge publish(String address, String message, Handler<Message<?>> replyHandler) {
Objects.requireNonNull(webSocket);
sendMessage("publish", address, message, replyHandler);
return this;
}
public EventBusBridge send(String address, JsonObject message, Handler<Message<?>> replyHandler) {
Objects.requireNonNull(webSocket);
sendMessage("send", address, message, replyHandler);
return this;
}
public EventBusBridge publish(String address, JsonObject message, Handler<Message<?>> replyHandler) {
Objects.requireNonNull(webSocket);
sendMessage("publish", address, message, replyHandler);
return this;
}
public EventBusBridge registerHandler(String address, Handler<Message<?>> dataHandler) {
handlers.computeIfAbsent(address, key -> {
webSocket.writeMessage(Buffer.buffer(new JsonObject().put("type","register").put("address", address).toString()));
return Collections.synchronizedList(new ArrayList<Handler<Message<?>>>());
}).add(dataHandler);
return this;
}
public EventBusBridge unregisterHandler(String address, Handler<Message<JsonObject>> dataHandler) {
List<Handler<Message<?>>> handlers = this.handlers.getOrDefault(address, Collections.emptyList());
if (handlers.isEmpty()) {
String unregisterMsg = new JsonObject().put("type","unregister").put("address", address).toString();
webSocket.writeMessage(Buffer.buffer(unregisterMsg));
}
return this;
}
public void close() {
webSocket.close();
}
private void sendMessage(String sendOrPublish, String address, Object message, Handler<Message<?>> replyHandler) {
JsonObject msg = new JsonObject().put("type", sendOrPublish).put("address", address).put("body", message);
if (replyHandler != null) {
String replyAddress = UUID.randomUUID().toString();
replyHandlers.put(replyAddress, replyHandler);
msg.put("replyAddress", replyAddress);
}
webSocket.writeMessage(Buffer.buffer(msg.toString()));
}
protected void bufferReceived(Buffer buffer) {
//System.out.println("Buffer Received");
//System.out.println(buffer.toString());
JsonObject msg = new JsonObject(buffer.toString());
String type = msg.getString("type");
if ("err".equals(type)) {
// TODO invoke error handler
System.err.println("Error message from the event bus bridge:" + msg.toString());
return;
}
String address = msg.getString("address");
EventBusMessage result = new EventBusMessage(msg);
for (Handler<Message<?>> h : handlers.getOrDefault(address, Collections.emptyList())) {
h.handle(result);
}
if (replyHandlers.containsKey(address)) {
Handler<Message<?>> replyHandler = replyHandlers.remove(address);
replyHandler.handle(result);
}
}
class EventBusMessage<T> implements Message<T> {
String address;
String replyAddress;
T body;
EventBusMessage(JsonObject json) {
address = json.getString("address");
replyAddress = json.getString("replyAddress", null);
body = (T)json.getValue("body");
}
EventBusMessage(Message<T> result) {
address = result.address();
replyAddress = result.replyAddress();
body = result.body();
}
@Override
public String address() {
return null;
}
@Override
public MultiMap headers() {
throw new UnsupportedOperationException("No headers supported");
}
@Override
public T body() {
return body;
}
@Override
public String replyAddress() {
return replyAddress;
}
@Override
public void reply(Object message) {
this.reply(message, new DeliveryOptions(), (Handler)null);
}
@Override
public <R> void reply(Object message, Handler<AsyncResult<Message<R>>> replyHandler) {
this.reply(message, new DeliveryOptions(), replyHandler);
}
@Override
public void reply(Object message, DeliveryOptions options) {
this.reply(message, options, (Handler)null);
}
@Override
public <R> void reply(Object message, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<R>>> replyHandler) {
if (this.replyAddress != null) {
EventBusBridge.this.send(replyAddress, message.toString(), result -> {
replyHandler.handle(new AsyncResult<Message<R>>() {
@Override
public Message<R> result() {
return new EventBusMessage(result);
}
@Override
public Throwable cause() {
return null;
}
@Override
public boolean succeeded() {
return true;
}
@Override
public boolean failed() {
return false;
}
});
});
}
}
@Override
public void fail(int i, String s) {
throw new UnsupportedOperationException("Failure is not an option");
}
}
}
@beders
Copy link
Author

beders commented Jun 25, 2015

(only works with vert.x 3.0.0-milestone6 which had a writeBuffer method)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment