-
-
Save michalsida/8366de8b003d71b373dd603118d9a264 to your computer and use it in GitHub Desktop.
Workarround snippet for corrupted Vert.X cluster
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.cgi.mcsb.lib.commons.mcsb.vertx.consumer.MessagesConsumerBase; | |
import com.cgi.mcsb.lib.commons.mcsb.vertx.factory.SpringVerticleFactory; | |
import io.vertx.core.*; | |
import io.vertx.core.impl.VertxImpl; | |
import io.vertx.core.spi.cluster.AsyncMultiMap; | |
import io.vertx.core.spi.cluster.ChoosableIterable; | |
import io.vertx.core.spi.cluster.ClusterManager; | |
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; | |
import io.vertx.spi.cluster.hazelcast.impl.HazelcastClusterNodeInfo; | |
import org.apache.commons.lang3.tuple.Pair; | |
import org.slf4j.Logger; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import javax.annotation.PostConstruct; | |
import javax.annotation.PreDestroy; | |
import java.net.UnknownHostException; | |
import java.util.*; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.stream.Collectors; | |
/** | |
* Implemented workarround for situation, where some consumers are lost from shared hazelcast multimap of topic consumers. | |
* It occurs during startup, where separated hazelcast miniclusters are created at first and following cluster merge | |
* will probably throw away consumers from the smaller minicluster. This mechanism detects this situation in Vertx.X and | |
* hazelcast data, close all consumer verticles than and start them again finally. | |
* <p/> | |
* See: <a href="https://groups.google.com/forum/?fromgroups#!topic/vertx/KpEtd6IG36Y">Stated discrussion in Vert.X group</a><br/> | |
* <a href="https://groups.google.com/forum/#!topic/hazelcast/fnqk9lSSr8Y">Started discussion in Hazelcast group</a> | |
*/ | |
public abstract class VertxConfigurerSupport { | |
public static final String HAZELCAST_MULTIMAP_OF_VERTX_SUBSRIBERS = "__vertx.subs"; | |
// Some custom factory for providing spring beans - io.vertx.core.Verticle implementations | |
@Autowired | |
private SpringVerticleFactory springVerticleFactory; | |
@Autowired | |
private Set<Verticle> allVerticles; | |
private List<Pair<Verticle, String>> verticleWithTopics; | |
private Vertx vertx; | |
private Long consumerSelfTestTimer; | |
private Map<String, List<String>> deploymentIDs = new HashMap<>(); | |
private AsyncMultiMap<Object, Object> clusterSubscriberMap; | |
private Set<String> nodeIdHistory = new HashSet<>(); | |
@PostConstruct | |
public void deployVerticles() throws UnknownHostException { | |
// We have spring beans extending io.vertx.core.AbstractVerticle for consuming message on specified topic | |
verticleWithTopics = allVerticles.stream() | |
.map(verticle -> Pair.of(verticle, verticle instanceof MessagesConsumerBase ? ((MessagesConsumerBase)verticle).getTopic() : null)) | |
.collect(Collectors.toList()); | |
final VertxOptions clusteredVertxOptions = null; // Add some configuration | |
Vertx.clusteredVertx(clusteredVertxOptions, res -> { | |
if (res.succeeded()) { | |
vertx = res.result(); | |
vertx.registerVerticleFactory(springVerticleFactory); | |
startVerticles(); | |
// Can by disabled by explicitly property define to 0 | |
consumerSelfTestTimer = vertx.setPeriodic(30 * 1000, this::checkConsumersMap); | |
} else { | |
getLogger().error("Vert.x cluster creation error", res.cause()); | |
res.cause().printStackTrace(); | |
System.exit(1); | |
} | |
}); | |
} | |
@PreDestroy | |
public void undeployVerticles() { | |
if (vertx != null) { | |
if (consumerSelfTestTimer != null) { | |
vertx.cancelTimer(consumerSelfTestTimer); | |
consumerSelfTestTimer = null; | |
} | |
undeploySelectedVerticles(null); | |
} | |
} | |
protected abstract Logger getLogger(); | |
private void startVerticles() { | |
// Scale the verticles on cores: create instances per processor during the deployment | |
verticleWithTopics.stream() | |
.map(Pair::getLeft) | |
.forEach(verticle -> { | |
final String verticleName = verticle.getClass().getName(); | |
getLogger().debug("Vert.x starting verticle " + verticleName); | |
// Scale the verticles on cores: create instances per processor during the deployment | |
vertx.deployVerticle(springVerticleFactory.prefix() + ":" + verticleName, new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors()).setWorker(true).setWorkerPoolSize(5), dres -> { | |
if (dres.succeeded()) { | |
deploymentIDs.putIfAbsent(verticleName, new ArrayList<>()); | |
deploymentIDs.get(verticleName).add(dres.result()); | |
getLogger().debug("Vert.x deployed verticle " + dres.result() + " for verticle " + verticleName); | |
} else { | |
getLogger().error("Vert.x deployed verticle " + verticleName + " error", dres.cause()); | |
dres.cause().printStackTrace(); | |
System.exit(1); | |
} | |
}); | |
}); | |
} | |
private void undeploySelectedVerticles(Runnable finalizationHandler) { | |
final AtomicLong remainingVerticleCounter = new AtomicLong((long) verticleWithTopics.size()); | |
verticleWithTopics.stream() | |
.map(Pair::getLeft) | |
.forEach(verticle -> { | |
final String verticleName = verticle.getClass().getName(); | |
getLogger().debug("Vert.x stopping verticle " + verticleName); | |
final List<String> deploymentIdsToUndeploy = deploymentIDs.getOrDefault(verticleName, new ArrayList<>()); | |
final AtomicLong remainingDeploymentsCounter = new AtomicLong(deploymentIdsToUndeploy.size()); | |
deploymentIdsToUndeploy.forEach(deploymentID -> { | |
getLogger().debug("Vert.x undeploy " + deploymentID + " for verticle " + verticleName + " started"); | |
vertx.undeploy(deploymentID, res -> { | |
if (res.succeeded()) { | |
getLogger().debug("Vert.x undeploy " + deploymentID + " for verticle " + verticleName + " end"); | |
} else { | |
getLogger().error("Vert.x undeployd " + deploymentID + " error", res.cause()); | |
res.cause().printStackTrace(); | |
} | |
if (remainingDeploymentsCounter.decrementAndGet() == 0) { | |
if (remainingVerticleCounter.decrementAndGet() == 0 && finalizationHandler != null) { | |
// Last undeploy processed, call finalization handler | |
finalizationHandler.run(); | |
} | |
} | |
}); | |
} | |
); | |
deploymentIDs.remove(verticleName); | |
}); | |
} | |
private void checkConsumersMap(Long timer) { | |
if (vertx instanceof VertxImpl) { | |
final ClusterManager clusterManager = ((VertxImpl) vertx).getClusterManager(); | |
final String currentNodeID = ((VertxImpl) vertx).getNodeID(); | |
if (clusterManager instanceof HazelcastClusterManager) { | |
String currentHazelcastNodeID = ((HazelcastClusterManager) clusterManager).getHazelcastInstance().getLocalEndpoint().getUuid(); | |
if (!currentNodeID.equals(currentHazelcastNodeID)) { | |
if (!nodeIdHistory.contains(currentHazelcastNodeID)) { | |
getLogger().error("Hazelcast local endpoint {} UUID {} differs from Vertx NodeId {}", | |
((HazelcastClusterManager) clusterManager).getHazelcastInstance().getLocalEndpoint().getSocketAddress().toString(), | |
currentHazelcastNodeID, currentNodeID); | |
} else { | |
if (getLogger().isTraceEnabled()) { | |
getLogger().trace("Hazelcast local endpoint {} UUID {} still differs from Vertx NodeId {}", | |
((HazelcastClusterManager) clusterManager).getHazelcastInstance().getLocalEndpoint().getSocketAddress().toString(), | |
currentHazelcastNodeID, currentNodeID); | |
} | |
} | |
} | |
nodeIdHistory.add(currentHazelcastNodeID); | |
} | |
if (clusterSubscriberMap != null) { | |
controlPresenceInConsumersMap(currentNodeID); | |
} else { | |
// Retrieve multimap only once, I think that there is memory leak in ClusterManager#getAsyncMultiMap | |
// It stores result in WeakMap, but it is register in some concurrent map of listeners in HazelCast too | |
clusterManager.getAsyncMultiMap(HAZELCAST_MULTIMAP_OF_VERTX_SUBSRIBERS, subscriberMapRes -> { | |
if (subscriberMapRes.succeeded()) { | |
clusterSubscriberMap = subscriberMapRes.result(); | |
controlPresenceInConsumersMap(currentNodeID); | |
} else { | |
getLogger().warn("Hazelcast consumers map is not present, can not test registration"); | |
} | |
}); | |
} | |
} else { | |
getLogger().error("Unexpected type of Vertx: " + vertx.getClass().getName()); | |
} | |
} | |
private void controlPresenceInConsumersMap(String nodeID) { | |
if (getLogger().isTraceEnabled()) { | |
getLogger().trace("Testing registration of all {} topic consumers for node {}", | |
(long) verticleWithTopics.size(), nodeID); | |
} | |
AtomicInteger errorCount = new AtomicInteger(0); | |
verticleWithTopics | |
.forEach(verticlePair -> { | |
if (errorCount.get() > 0) { | |
return; // Break on the first error | |
} | |
final MessagesConsumerBase consumerVerticle = (MessagesConsumerBase) verticlePair.getLeft(); | |
final String verticleTopic = verticlePair.getRight(); | |
clusterSubscriberMap.get(verticleTopic, topicSubscribersRes -> controlPresenceOfThisNodeInTopicConsumersList( | |
consumerVerticle, verticleTopic, nodeID, topicSubscribersRes, errorCount)); | |
}); | |
} | |
private void controlPresenceOfThisNodeInTopicConsumersList(MessagesConsumerBase consumerVerticle, String verticleTopic, String thisConsumerNodeID, AsyncResult<ChoosableIterable<Object>> consumersList, AtomicInteger errorCount) { | |
if (errorCount.get() > 0) { | |
return; // Break on the first error | |
} | |
HazelcastClusterNodeInfo thisSubscriberNodeInfo = null; | |
if (consumersList.succeeded()) { | |
final ChoosableIterable<Object> topicSubscribers = consumersList.result(); | |
for (Object topicSubscriber : topicSubscribers) { | |
if (topicSubscriber instanceof HazelcastClusterNodeInfo && | |
thisConsumerNodeID.equals((((HazelcastClusterNodeInfo) topicSubscriber).nodeId))) { | |
// This consumer node ID is found in shared Hazelcast multimap of topic subscribers = OK | |
thisSubscriberNodeInfo = (HazelcastClusterNodeInfo) topicSubscriber; | |
break; | |
} | |
} | |
} | |
if (thisSubscriberNodeInfo != null) { | |
getLogger().trace("Consumer for topic {} is present in cluster map", verticleTopic); | |
} else { | |
if (errorCount.getAndIncrement() == 0) { | |
getLogger().error("No consumer for topic {} is present in cluster map, all consumers will be redeployed", verticleTopic); | |
undeploySelectedVerticles(() -> vertx.setTimer(5_000, timer -> { // Wait "a little" for all unregister actions | |
getLogger().error("All consumers were undeployed and will be deployed again", verticleTopic); | |
startVerticles(); | |
})); | |
} else { | |
getLogger().error("No consumer for topic {} is present in cluster map, redeployment was detected already", verticleTopic); | |
} | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment