Skip to content

Instantly share code, notes, and snippets.

@cdjones32
Last active July 5, 2019 19:06
Show Gist options
  • Save cdjones32/8f4fd6247d6f98ff24a3 to your computer and use it in GitHub Desktop.
Save cdjones32/8f4fd6247d6f98ff24a3 to your computer and use it in GitHub Desktop.
Example VertX Cluster manager to join a cluster as a client instead of a cluster node.
package demo;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.config.Config;
import com.hazelcast.config.XmlConfigBuilder;
import com.hazelcast.core.*;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.impl.ExtendedClusterManager;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.Lock;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.spi.cluster.hazelcast.impl.HazelcastAsyncMap;
import io.vertx.spi.cluster.hazelcast.impl.HazelcastAsyncMultiMap;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/**
* Proof of concept for a Hazelcast Cluster Manager that only joins the cluster as a client and not a full cluster member.
*
* This code is a verbatim cut/paste of the io.vertx.spi.cluster.hazelcast.HazelcastClusterManager class with the only changes being:
* - Changes to the 'join' method to join as a client instead of a cluster member
* - The constructors/log entry to match up to the class name...
*/
public class HazelcastClientClusterManager implements ExtendedClusterManager, MembershipListener {
/**
* NOTE: This is the only method that changed from the source: io.vertx.spi.cluster.hazelcast.HazelcastClusterManager
*/
public synchronized void join(Handler<AsyncResult<Void>> resultHandler) {
vertx.executeBlocking(fut -> {
if (!active) {
active = true;
// Simple example of using the Client Config instead of the Node Config.
ClientConfig clientConfig = new ClientConfig();
clientConfig.getGroupConfig().setName("dev").setPassword("hazelcast-cluster-password");
clientConfig.getNetworkConfig().addAddress("192.168.0.7");
// replaced this: hazelcast = Hazelcast.new.newHazelcastInstance(conf);
hazelcast = HazelcastClient.newHazelcastClient(clientConfig);
// setting node ID Manually for the demo, as the instance is not a cluster member and the LocalMember call is invalid.
nodeID = "client"; //hazelcast.getCluster().getLocalMember().getUuid();
membershipListenerId = hazelcast.getCluster().addMembershipListener(this);
fut.complete();
}
}, resultHandler);
}
/*** NOTE: THERE ARE NO SIGNIFICANT CHANGES BELOW THIS POINT ***/
/*** COPY AND PASTE FROM: io.vertx.spi.cluster.hazelcast.HazelcastClusterManager ***/
private static final Logger log = LoggerFactory.getLogger(HazelcastClientClusterManager.class);
private static final String LOCK_SEMAPHORE_PREFIX = "__vertx.";
// Hazelcast config file
private static final String DEFAULT_CONFIG_FILE = "default-cluster.xml";
private static final String CONFIG_FILE = "cluster.xml";
private Vertx vertx;
private HazelcastInstance hazelcast;
private String nodeID;
private String membershipListenerId;
private NodeListener nodeListener;
private volatile boolean active;
private Config conf;
/**
* Constructor - gets config from classpath
*/
public HazelcastClientClusterManager() {
// We have our own shutdown hook and need to ensure ours runs before Hazelcast is shutdown
System.setProperty("hazelcast.shutdownhook.enabled", "false");
}
/**
* Constructor - config supplied
* @param conf
*/
public HazelcastClientClusterManager(Config conf) {
this.conf = conf;
}
public void setVertx(Vertx vertx) {
this.vertx = vertx;
}
/**
* Every eventbus handler has an ID. SubsMap (subscriber map) is a MultiMap which
* maps handler-IDs with server-IDs and thus allows the eventbus to determine where
* to send messages.
*
* @param name A unique name by which the the MultiMap can be identified within the cluster.
* See the cluster config file (e.g. cluster.xml in case of HazelcastClusterManager) for
* additional MultiMap config parameters.
* @return subscription map
*/
@Override
public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> resultHandler) {
vertx.executeBlocking(fut -> {
com.hazelcast.core.MultiMap<K, V> multiMap = hazelcast.getMultiMap(name);
fut.complete(new HazelcastAsyncMultiMap<>(vertx, multiMap));
}, resultHandler);
}
@Override
public String getNodeID() {
return nodeID;
}
@Override
public List<String> getNodes() {
Set<Member> members = hazelcast.getCluster().getMembers();
List<String> lMembers = new ArrayList<>();
for (Member member: members) {
lMembers.add(member.getUuid());
}
return lMembers;
}
@Override
public void nodeListener(NodeListener listener) {
this.nodeListener = listener;
}
@Override
public <K, V> void getAsyncMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> resultHandler) {
vertx.executeBlocking(fut -> {
IMap<K, V> map = hazelcast.getMap(name);
fut.complete(new HazelcastAsyncMap<>(vertx, map));
}, resultHandler);
}
@Override
public <K, V> Map<K, V> getSyncMap(String name) {
IMap<K, V> map = hazelcast.getMap(name);
return map;
}
@Override
public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) {
vertx.executeBlocking(fut -> {
ISemaphore iSemaphore = hazelcast.getSemaphore(LOCK_SEMAPHORE_PREFIX + name);
boolean locked = false;
try {
locked = iSemaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// OK continue
}
if (locked) {
fut.complete(new HazelcastLock(iSemaphore));
} else {
throw new VertxException("Timed out waiting to get lock " + name);
}
}, resultHandler);
}
@Override
public void getCounter(String name, Handler<AsyncResult<Counter>> resultHandler) {
vertx.executeBlocking(fut -> fut.complete(new HazelcastCounter(hazelcast.getAtomicLong(name))), resultHandler);
}
public synchronized void leave(Handler<AsyncResult<Void>> resultHandler) {
vertx.executeBlocking(fut -> {
if (active) {
try {
active = false;
boolean left = hazelcast.getCluster().removeMembershipListener(membershipListenerId);
if (!left) {
log.warn("No membership listener");
}
while (hazelcast.getLifecycleService().isRunning()) {
try {
// This can sometimes throw java.util.concurrent.RejectedExecutionException so we retry.
hazelcast.getLifecycleService().shutdown();
} catch (RejectedExecutionException ignore) {
ignore.printStackTrace();
}
Thread.sleep(1);
}
} catch (Throwable t) {
throw new RuntimeException(t.getMessage());
}
}
fut.complete();
}, resultHandler);
}
@Override
public synchronized void memberAdded(MembershipEvent membershipEvent) {
if (!active) {
return;
}
try {
if (nodeListener != null) {
Member member = membershipEvent.getMember();
nodeListener.nodeAdded(member.getUuid());
}
} catch (Throwable t) {
log.error("Failed to handle memberAdded", t);
}
}
@Override
public synchronized void memberRemoved(MembershipEvent membershipEvent) {
if (!active) {
return;
}
try {
if (nodeListener != null) {
Member member = membershipEvent.getMember();
nodeListener.nodeLeft(member.getUuid());
}
} catch (Throwable t) {
log.error("Failed to handle memberRemoved", t);
}
}
@Override
public boolean isActive() {
return active;
}
@Override
public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
}
private InputStream getConfigStream() {
ClassLoader ctxClsLoader = Thread.currentThread().getContextClassLoader();
InputStream is = null;
if (ctxClsLoader != null) {
is = ctxClsLoader.getResourceAsStream(CONFIG_FILE);
}
if (is == null) {
is = getClass().getClassLoader().getResourceAsStream(CONFIG_FILE);
if (is == null) {
is = getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_FILE);
}
}
return is;
}
/**
* Get the Hazelcast config
* @return a config object
*/
public Config getConfig() {
return conf;
}
/**
* Set the hazelcast config
* @param config
*/
public void setConfig(Config config) {
this.conf = config;
}
public Config loadConfigFromClasspath() {
Config cfg = null;
try (InputStream is = getConfigStream();
InputStream bis = new BufferedInputStream(is)) {
if (is != null) {
cfg = new XmlConfigBuilder(bis).build();
}
} catch (IOException ex) {
log.error("Failed to read config", ex);
}
return cfg;
}
public void beforeLeave() {
if (isActive()) {
ILock lock = hazelcast.getLock("vertx.shutdownlock");
try {
lock.tryLock(30, TimeUnit.SECONDS);
} catch (Exception ignore) {
}
// The lock should be automatically released when the node is shutdown
}
}
public HazelcastInstance getHazelcastInstance() {
return hazelcast;
}
private class HazelcastCounter implements Counter {
private IAtomicLong atomicLong;
private HazelcastCounter(IAtomicLong atomicLong) {
this.atomicLong = atomicLong;
}
@Override
public void get(Handler<AsyncResult<Long>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.get()), resultHandler);
}
@Override
public void incrementAndGet(Handler<AsyncResult<Long>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.incrementAndGet()), resultHandler);
}
@Override
public void getAndIncrement(Handler<AsyncResult<Long>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.getAndIncrement()), resultHandler);
}
@Override
public void decrementAndGet(Handler<AsyncResult<Long>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.decrementAndGet()), resultHandler);
}
@Override
public void addAndGet(long value, Handler<AsyncResult<Long>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.addAndGet(value)), resultHandler);
}
@Override
public void getAndAdd(long value, Handler<AsyncResult<Long>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.getAndAdd(value)), resultHandler);
}
@Override
public void compareAndSet(long expected, long value, Handler<AsyncResult<Boolean>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.compareAndSet(expected, value)), resultHandler);
}
}
private class HazelcastLock implements Lock {
private ISemaphore semaphore;
private HazelcastLock(ISemaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void release() {
semaphore.release();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment