Skip to content

Instantly share code, notes, and snippets.

@unixunion
Last active March 4, 2016 09:21
Show Gist options
  • Save unixunion/fbf075e22b2e1d32a47d to your computer and use it in GitHub Desktop.
Save unixunion/fbf075e22b2e1d32a47d to your computer and use it in GitHub Desktop.
package com.deblox.clustering;
import com.deblox.Boot;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.SharedData;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.UUID;
import static com.deblox.Util.sendError;
import static com.deblox.Util.sendOk;
/**
* Created by keghol on 09/11/15.
*
* Listens on topic: deblox.TransponderService
* Listens on private topic: deblox.TransponderService.{HOSTNAME} and deblox.TransponderService.{UUID}
*
* maintains a clusterwide shared data object called nodes, which contains a key: nodes with a JsonObject representing
* all nodes in the cluster as a JsonArray.
*
* Periodically announce self to the cluster
* Periodically iterate over nodes in the cluster and "ping", remove unackknowledged nodes
* Keep track of clusterSize
*
*/
public class TransponderService extends AbstractVerticle implements Handler<Message<JsonObject>> {
private static final Logger logger = LoggerFactory.getLogger(TransponderService.class);
private static String nodeSharedData = "nodes"; // clusterwide map and key to store list of nodes in.
private static String nodeListKey = "nodes"; // the key we will store ALL the nodes in
private String transponderTopicPrefix = "greatsnipe.TransponderService"; // default prefix for the eventbus topics
private static int clusterSize = 1; // set the clusterSize to 1, this variable is updated by "register" handler
// holders
DeliveryOptions eventbusOptions;
private static String hostname;
private String transponderAddress;
private static String localHashAddress;
private long announceTimer; // will hold the instance of the Timer
private long houseKeepingTimer; // will hold the instance of the Timer
// timers / timeouts
private int announceInterval = 1000; // the interval between announcing
private int housekeepingInterval = 10000; // interval at which housekeeping function fires
private int eventbusTimeout = 3000; // timeout for events on the eventbus
/**
* Put any graceful shutdown stuff here.
*
* @param stopFuture
* @throws Exception
*/
@Override
public void stop(Future<Void> stopFuture) throws Exception{
logger.info("Shutting Down");
vertx.eventBus().publish(transponderAddress, new JsonObject()
.put("action", "unregister")
.put("hostname", hostname), eventbusOptions);
try {
vertx.setTimer(1000, event -> {
stopFuture.complete();
});
} catch (Exception e) {
e.printStackTrace();
logger.error("Unable to shutdown");
}
}
/**
* Startup the service, load config, register handlers, start timers...
*
* @param startFuture
* @throws Exception
*/
public void start(Future<Void> startFuture) throws Exception {
logger.info("deploying with config: " + config().toString());
// Read hostname from config, or get from OS
try {
hostname = config().getString("hostname", InetAddress.getLocalHost().getHostName());
logger.info("hostname: " + hostname);
} catch (UnknownHostException e) {
e.printStackTrace();
logger.error("unable to determine hostname, please add a hostname config for this verticle");
startFuture.fail("unable to determine hostname, please add a hostname config for this verticle");
}
// set the timers / intervals / timeouts
announceInterval = config().getInteger("announceInterval", announceInterval);
housekeepingInterval = config().getInteger("housekeepingInterval", housekeepingInterval);
eventbusTimeout = config().getInteger("eventbusTimeout", eventbusTimeout);
// queues and topics addresses
transponderAddress = config().getString("transponderAddress", transponderTopicPrefix);
localHashAddress = transponderAddress + "." + UUID.randomUUID().toString();
// messaging options for when sending message from this verticle.
eventbusOptions = new DeliveryOptions()
.addHeader("hostname", hostname)
.addHeader("localHashAddress", localHashAddress)
.setSendTimeout(eventbusTimeout);
logger.info("Starting consumers");
MessageConsumer<JsonObject> transponderAddressConsumer = vertx.eventBus().consumer(transponderAddress, this);
transponderAddressConsumer.completionHandler(res -> {
logger.info("transponderAddress: " + transponderAddress);
});
MessageConsumer<JsonObject> localHashAddressConsumer = vertx.eventBus().consumer(localHashAddress, this);
localHashAddressConsumer.completionHandler(res -> {
logger.info("localHashAddress: " + localHashAddress);
});
// Publish a notification to the cluster to register myself periodically
announceTimer = vertx.setPeriodic(announceInterval, timerID -> {
JsonObject request = new JsonObject()
.put("action", "register")
.put("config", Boot.config);
logger.debug("Sending request: " + request + " to " + transponderAddress);
vertx.eventBus().send(transponderAddress, request, eventbusOptions, event -> {
if (event.succeeded()) {
JsonObject response = new JsonObject(event.result().body().toString());
if (response.getString("status").equals("ok")) {
logger.debug("Successfully announced to cluster");
} else {
logger.warn("Response does not contain a valid status");
logger.warn(response.toString());
}
}
});
});
logger.info("announceTimer id: " + announceTimer);
/**
* go through nodes in the cluster and check if they are alive, clean up cluster map if they are dead.
*/
houseKeepingTimer = vertx.setPeriodic(housekeepingInterval, timerID ->{
logger.debug("Checking nodes in the cluster");
try {
SharedData sd = vertx.sharedData();
// get the map called "nodes"
sd.<String, JsonObject>getClusterWideMap(nodeSharedData, result -> {
if (result.succeeded()) {
try {
// get the "nodes" key from the map
result.result().get(nodeListKey, r -> {
try {
logger.debug(nodeListKey + " key value: " + r.result().toString());
JsonObject document = r.result();
logger.debug("Cluster size: " + document.size());
// update this node's clusterSize variable to match the document
if (clusterSize != document.size()) {
clusterSize = document.size();
logger.info("Cluster resized to " + clusterSize + " nodes");
}
document.forEach(node -> {
pingNode(node.getKey());
});
} catch (NullPointerException e) {
logger.warn("No nodes in cluster yet");
}
});
} catch (NullPointerException e) {
logger.warn("Cluster does not contain any data yet");
}
} else {
// unable to get the clusterwidemap
logger.warn("Cluster lookup request failed");
}
});
} catch (NullPointerException e) {
logger.warn("Cluster does not seem to be available");
}
});
logger.info("houseKeepingTimer id: " + houseKeepingTimer);
}
/**
* register a node with the cluster, accepts JSON with a "hostname" field
*
* @param event
*/
private void doRegisterNode(Message<JsonObject> event) {
logger.debug("Node: " + event.replyAddress() + " requesting registration of: " + event.body());
// the host's unique address is used as a identifier
String hostAddress = event.headers().get("localHashAddress");
// we will store the host's config in the map
JsonObject hostConfig = event.body().getJsonObject("config", new JsonObject());
logger.debug("Getting nodeShared Data map: " + nodeSharedData);
SharedData sd = vertx.sharedData();
sd.<String, JsonObject>getClusterWideMap(nodeSharedData, req -> {
req.result().putIfAbsent(nodeListKey, new JsonObject(), r2 -> {
if (r2.succeeded()) {
req.result().get(nodeListKey, nodesList -> {
JsonObject nodes = nodesList.result();
nodes.put(hostAddress, hostConfig);
req.result().put(nodeListKey, nodes, updateNodes -> {
if (updateNodes.succeeded()) {
logger.debug("Updated nodes document");
sendOk(event);
} else {
logger.error("Unable to update nodes document");
sendError(event);
}
});
});
} else {
logger.warn("Unable to create / get the node shared data object");
sendError(event);
}
});
});
}
/**
* unregister a node from the cluster, accepts JSON with a "localHashAddress" header matching the
* "key" in nodes document on the clusterwide shared storage.
* @param event
*/
private void doUnregisterNode(Message<JsonObject> event, String hashAddress) {
logger.debug("Unregister node: " + event.body());
String host = hashAddress;
logger.info("Removing node: " + host);
SharedData sd = vertx.sharedData();
sd.<String, JsonObject>getClusterWideMap(nodeSharedData, req -> {
if (req.succeeded()) {
req.result().get(nodeListKey, nodesList -> {
JsonObject nodes = nodesList.result();
nodes.remove(host);
req.result().put(nodeListKey, nodes, updateNodes -> {
if (updateNodes.succeeded()) {
logger.debug("Updated nodes document");
sendOk(event);
} else {
logger.error("Unable to update nodes document");
sendError(event);
}
});
});
} else {
logger.warn("Cluster unavailable");
sendError(event);
}
});
}
/**
* Main event handlers for this verticle
* @param event
*/
@Override
public void handle(Message<JsonObject> event) {
final String action = event.body().getString("action");
switch (action) {
case "register":
doRegisterNode(event);
break;
case "unregister":
doUnregisterNode(event, event.body().getString("hostname"));
break;
case "ping":
doPing(event);
break;
default:
logger.warn("Undefined action: " + action);
sendError(event, "Undefined action: " + action);
break;
}
}
/**
* ping a node on its private address, if its not answering, remove it from the nodes map
*/
public void pingNode(String nodeAddress) {
logger.debug("Checking host at address: " + nodeAddress);
vertx.eventBus().send(nodeAddress, new JsonObject().put("action", "ping"), eventbusOptions, ping -> {
if (ping.succeeded()) {
JsonObject response = new JsonObject(ping.result().body().toString());
if (response.getString("status").equals("ok")) {
logger.debug("host " + nodeAddress + " is alive");
} else {
logger.warn("host " + nodeAddress + " has responded invalidly: " + ping.result().toString());
}
} else {
logger.warn("Node " + nodeAddress + " unreachable, removing, no such address registered?");
JsonObject request = new JsonObject()
.put("action", "unregister")
.put("hostname", nodeAddress);
logger.info("Sending unregister request for node: " + nodeAddress);
vertx.eventBus().send(transponderAddress, request, eventbusOptions, removeNode -> {
if (removeNode.succeeded()) {
logger.debug("Node removal requested");
} else {
logger.warn("Failed to remove node, will retry later.");
}
});
}
});
}
// basically sends back a OK message
public void doPing(Message<JsonObject> event) {
sendOk(event, eventbusOptions);
}
// accessor for other verticles to determine the cluster size
public static int getClusterSize() {
return clusterSize;
}
// accessor for other verticles to deterine the configured hostname
public static String getHostname() {
return hostname;
}
// accessor for other verticles to deterine the configured localAddress
public static String getLocalHashAddress() {
return localHashAddress;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment