Skip to content

Instantly share code, notes, and snippets.

@michalsida
Created October 3, 2018 19:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michalsida/8366de8b003d71b373dd603118d9a264 to your computer and use it in GitHub Desktop.
Save michalsida/8366de8b003d71b373dd603118d9a264 to your computer and use it in GitHub Desktop.
Workarround snippet for corrupted Vert.X cluster
import com.cgi.mcsb.lib.commons.mcsb.vertx.consumer.MessagesConsumerBase;
import com.cgi.mcsb.lib.commons.mcsb.vertx.factory.SpringVerticleFactory;
import io.vertx.core.*;
import io.vertx.core.impl.VertxImpl;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
import io.vertx.spi.cluster.hazelcast.impl.HazelcastClusterNodeInfo;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
* Implemented workarround for situation, where some consumers are lost from shared hazelcast multimap of topic consumers.
* It occurs during startup, where separated hazelcast miniclusters are created at first and following cluster merge
* will probably throw away consumers from the smaller minicluster. This mechanism detects this situation in Vertx.X and
* hazelcast data, close all consumer verticles than and start them again finally.
* <p/>
* See: <a href="https://groups.google.com/forum/?fromgroups#!topic/vertx/KpEtd6IG36Y">Stated discrussion in Vert.X group</a><br/>
* <a href="https://groups.google.com/forum/#!topic/hazelcast/fnqk9lSSr8Y">Started discussion in Hazelcast group</a>
*/
public abstract class VertxConfigurerSupport {
public static final String HAZELCAST_MULTIMAP_OF_VERTX_SUBSRIBERS = "__vertx.subs";
// Some custom factory for providing spring beans - io.vertx.core.Verticle implementations
@Autowired
private SpringVerticleFactory springVerticleFactory;
@Autowired
private Set<Verticle> allVerticles;
private List<Pair<Verticle, String>> verticleWithTopics;
private Vertx vertx;
private Long consumerSelfTestTimer;
private Map<String, List<String>> deploymentIDs = new HashMap<>();
private AsyncMultiMap<Object, Object> clusterSubscriberMap;
private Set<String> nodeIdHistory = new HashSet<>();
@PostConstruct
public void deployVerticles() throws UnknownHostException {
// We have spring beans extending io.vertx.core.AbstractVerticle for consuming message on specified topic
verticleWithTopics = allVerticles.stream()
.map(verticle -> Pair.of(verticle, verticle instanceof MessagesConsumerBase ? ((MessagesConsumerBase)verticle).getTopic() : null))
.collect(Collectors.toList());
final VertxOptions clusteredVertxOptions = null; // Add some configuration
Vertx.clusteredVertx(clusteredVertxOptions, res -> {
if (res.succeeded()) {
vertx = res.result();
vertx.registerVerticleFactory(springVerticleFactory);
startVerticles();
// Can by disabled by explicitly property define to 0
consumerSelfTestTimer = vertx.setPeriodic(30 * 1000, this::checkConsumersMap);
} else {
getLogger().error("Vert.x cluster creation error", res.cause());
res.cause().printStackTrace();
System.exit(1);
}
});
}
@PreDestroy
public void undeployVerticles() {
if (vertx != null) {
if (consumerSelfTestTimer != null) {
vertx.cancelTimer(consumerSelfTestTimer);
consumerSelfTestTimer = null;
}
undeploySelectedVerticles(null);
}
}
protected abstract Logger getLogger();
private void startVerticles() {
// Scale the verticles on cores: create instances per processor during the deployment
verticleWithTopics.stream()
.map(Pair::getLeft)
.forEach(verticle -> {
final String verticleName = verticle.getClass().getName();
getLogger().debug("Vert.x starting verticle " + verticleName);
// Scale the verticles on cores: create instances per processor during the deployment
vertx.deployVerticle(springVerticleFactory.prefix() + ":" + verticleName, new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors()).setWorker(true).setWorkerPoolSize(5), dres -> {
if (dres.succeeded()) {
deploymentIDs.putIfAbsent(verticleName, new ArrayList<>());
deploymentIDs.get(verticleName).add(dres.result());
getLogger().debug("Vert.x deployed verticle " + dres.result() + " for verticle " + verticleName);
} else {
getLogger().error("Vert.x deployed verticle " + verticleName + " error", dres.cause());
dres.cause().printStackTrace();
System.exit(1);
}
});
});
}
private void undeploySelectedVerticles(Runnable finalizationHandler) {
final AtomicLong remainingVerticleCounter = new AtomicLong((long) verticleWithTopics.size());
verticleWithTopics.stream()
.map(Pair::getLeft)
.forEach(verticle -> {
final String verticleName = verticle.getClass().getName();
getLogger().debug("Vert.x stopping verticle " + verticleName);
final List<String> deploymentIdsToUndeploy = deploymentIDs.getOrDefault(verticleName, new ArrayList<>());
final AtomicLong remainingDeploymentsCounter = new AtomicLong(deploymentIdsToUndeploy.size());
deploymentIdsToUndeploy.forEach(deploymentID -> {
getLogger().debug("Vert.x undeploy " + deploymentID + " for verticle " + verticleName + " started");
vertx.undeploy(deploymentID, res -> {
if (res.succeeded()) {
getLogger().debug("Vert.x undeploy " + deploymentID + " for verticle " + verticleName + " end");
} else {
getLogger().error("Vert.x undeployd " + deploymentID + " error", res.cause());
res.cause().printStackTrace();
}
if (remainingDeploymentsCounter.decrementAndGet() == 0) {
if (remainingVerticleCounter.decrementAndGet() == 0 && finalizationHandler != null) {
// Last undeploy processed, call finalization handler
finalizationHandler.run();
}
}
});
}
);
deploymentIDs.remove(verticleName);
});
}
private void checkConsumersMap(Long timer) {
if (vertx instanceof VertxImpl) {
final ClusterManager clusterManager = ((VertxImpl) vertx).getClusterManager();
final String currentNodeID = ((VertxImpl) vertx).getNodeID();
if (clusterManager instanceof HazelcastClusterManager) {
String currentHazelcastNodeID = ((HazelcastClusterManager) clusterManager).getHazelcastInstance().getLocalEndpoint().getUuid();
if (!currentNodeID.equals(currentHazelcastNodeID)) {
if (!nodeIdHistory.contains(currentHazelcastNodeID)) {
getLogger().error("Hazelcast local endpoint {} UUID {} differs from Vertx NodeId {}",
((HazelcastClusterManager) clusterManager).getHazelcastInstance().getLocalEndpoint().getSocketAddress().toString(),
currentHazelcastNodeID, currentNodeID);
} else {
if (getLogger().isTraceEnabled()) {
getLogger().trace("Hazelcast local endpoint {} UUID {} still differs from Vertx NodeId {}",
((HazelcastClusterManager) clusterManager).getHazelcastInstance().getLocalEndpoint().getSocketAddress().toString(),
currentHazelcastNodeID, currentNodeID);
}
}
}
nodeIdHistory.add(currentHazelcastNodeID);
}
if (clusterSubscriberMap != null) {
controlPresenceInConsumersMap(currentNodeID);
} else {
// Retrieve multimap only once, I think that there is memory leak in ClusterManager#getAsyncMultiMap
// It stores result in WeakMap, but it is register in some concurrent map of listeners in HazelCast too
clusterManager.getAsyncMultiMap(HAZELCAST_MULTIMAP_OF_VERTX_SUBSRIBERS, subscriberMapRes -> {
if (subscriberMapRes.succeeded()) {
clusterSubscriberMap = subscriberMapRes.result();
controlPresenceInConsumersMap(currentNodeID);
} else {
getLogger().warn("Hazelcast consumers map is not present, can not test registration");
}
});
}
} else {
getLogger().error("Unexpected type of Vertx: " + vertx.getClass().getName());
}
}
private void controlPresenceInConsumersMap(String nodeID) {
if (getLogger().isTraceEnabled()) {
getLogger().trace("Testing registration of all {} topic consumers for node {}",
(long) verticleWithTopics.size(), nodeID);
}
AtomicInteger errorCount = new AtomicInteger(0);
verticleWithTopics
.forEach(verticlePair -> {
if (errorCount.get() > 0) {
return; // Break on the first error
}
final MessagesConsumerBase consumerVerticle = (MessagesConsumerBase) verticlePair.getLeft();
final String verticleTopic = verticlePair.getRight();
clusterSubscriberMap.get(verticleTopic, topicSubscribersRes -> controlPresenceOfThisNodeInTopicConsumersList(
consumerVerticle, verticleTopic, nodeID, topicSubscribersRes, errorCount));
});
}
private void controlPresenceOfThisNodeInTopicConsumersList(MessagesConsumerBase consumerVerticle, String verticleTopic, String thisConsumerNodeID, AsyncResult<ChoosableIterable<Object>> consumersList, AtomicInteger errorCount) {
if (errorCount.get() > 0) {
return; // Break on the first error
}
HazelcastClusterNodeInfo thisSubscriberNodeInfo = null;
if (consumersList.succeeded()) {
final ChoosableIterable<Object> topicSubscribers = consumersList.result();
for (Object topicSubscriber : topicSubscribers) {
if (topicSubscriber instanceof HazelcastClusterNodeInfo &&
thisConsumerNodeID.equals((((HazelcastClusterNodeInfo) topicSubscriber).nodeId))) {
// This consumer node ID is found in shared Hazelcast multimap of topic subscribers = OK
thisSubscriberNodeInfo = (HazelcastClusterNodeInfo) topicSubscriber;
break;
}
}
}
if (thisSubscriberNodeInfo != null) {
getLogger().trace("Consumer for topic {} is present in cluster map", verticleTopic);
} else {
if (errorCount.getAndIncrement() == 0) {
getLogger().error("No consumer for topic {} is present in cluster map, all consumers will be redeployed", verticleTopic);
undeploySelectedVerticles(() -> vertx.setTimer(5_000, timer -> { // Wait "a little" for all unregister actions
getLogger().error("All consumers were undeployed and will be deployed again", verticleTopic);
startVerticles();
}));
} else {
getLogger().error("No consumer for topic {} is present in cluster map, redeployment was detected already", verticleTopic);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment