Created
April 6, 2020 22:02
-
-
Save brianmhess/4ba3b22dc97833d11b45b21065c08183 to your computer and use it in GitHub Desktop.
Cassandra Health Checks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hessian.ambien; | |
import com.datastax.oss.driver.api.core.ConsistencyLevel; | |
import com.datastax.oss.driver.api.core.CqlIdentifier; | |
import com.datastax.oss.driver.api.core.CqlSession; | |
import com.datastax.oss.driver.api.core.config.DefaultDriverOption; | |
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; | |
import com.datastax.oss.driver.api.core.metadata.*; | |
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; | |
import com.datastax.oss.driver.api.core.metadata.token.TokenRange; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.actuate.health.Health; | |
import org.springframework.boot.actuate.health.HealthIndicator; | |
import javax.annotation.PostConstruct; | |
import java.util.*; | |
import java.util.function.BiPredicate; | |
import java.util.stream.Collectors; | |
public class CLOneHealthCheck implements HealthIndicator { | |
private Health unknownHealth = Health.unknown().build(); | |
@Autowired | |
private CqlSession session; | |
private Metadata metadata; | |
private Set<String> datacenters = null; | |
private Map<String,String> dcToKeyspaceMap = new HashMap<String,String>(); | |
private String simpleStrategyKeyspace = null; | |
private BiPredicate<Long, Long> enoughReplicas = this::oneReplica; | |
@PostConstruct | |
private void init() { | |
metadata = session.getMetadata(); | |
String clStr = session.getContext().getConfig().getDefaultProfile().getString(DefaultDriverOption.REQUEST_CONSISTENCY); | |
if (clStr.equals(ConsistencyLevel.LOCAL_QUORUM.name()) || clStr.equals(ConsistencyLevel.QUORUM.name())) | |
enoughReplicas = this::quorumReplicas; | |
} | |
/* | |
The cluster is defined as being healthy if there is one replica per token range in each data center. | |
Additoinally, if there is a keyspace replicated with SimpleStrategy, then there needs to be one replica | |
per token range in a SimpleStrategy way. | |
The returned Health will give DC-level information. If there is a keyspace with SimpleStrategy, then | |
there will also be information in a SimpleStrategy context. The information returned will be: | |
- Total number of nodes in the cluster (in any state) | |
- Total number of nodes reporting in an UP state | |
- List of all nodes and their state | |
- List of all datacenters | |
- Name of keyspace used for each datacenter | |
- List of good token ranges per datacenter | |
- List of bad token ranges per datacenter | |
- List of good nodes per datacenter | |
- List of bad nodes per datacenter | |
- List of good token ranges for SimpleStrategy (if present) | |
- List of bad token ranges for SimpleStrategy (if present) | |
- List of good nodes for SimpleStrategy (if present) | |
- List of bad nodes for SimpleStrategy (if present) | |
*/ | |
@Override | |
public Health health() { | |
setupKeyspacesAndDatacenters(metadata); | |
if (!metadata.getTokenMap().isPresent()) | |
return Health.unknown().withDetail("Error", "Could not get TokenMap").build(); | |
TokenMap tokenMap = metadata.getTokenMap().get(); | |
// For each DC - see if there is at least one replica per token range (plus SimpleStrategy, if needed) | |
Map<String,List<TokenRange>> dcBadRanges = new HashMap<String,List<TokenRange>>(); | |
Map<String,List<String>> dcBadNodes = new HashMap<String,List<String>>(); | |
Map<String,List<TokenRange>> dcGoodRanges = new HashMap<String,List<TokenRange>>(); | |
Map<String,List<String>> dcGoodNodes = new HashMap<String,List<String>>(); | |
Health.Builder hbuilder = Health.up(); | |
for (Map.Entry<String,String> entry : dcToKeyspaceMap.entrySet()) { | |
String datacenter = entry.getKey(); | |
String keyspace = entry.getValue(); | |
List<TokenRange> badTokenRanges = new ArrayList<TokenRange>(); | |
List<TokenRange> goodTokenRanges = new ArrayList<TokenRange>(); | |
for (TokenRange tr : tokenMap.getTokenRanges()) { | |
tokenMap.getReplicas(keyspace, tr); | |
long numReplicas = tokenMap.getReplicas(keyspace, tr) | |
.stream() | |
.filter(h -> (0 == h.getDatacenter().compareTo(datacenter))) | |
.count(); | |
long numReplicasUp = tokenMap.getReplicas(keyspace, tr) | |
.stream() | |
.filter(h -> (0 == h.getDatacenter().compareTo(datacenter))) | |
.filter(h -> (h.getState() == NodeState.UP)) | |
.count(); | |
if (enoughReplicas.test(numReplicas, numReplicasUp)) | |
badTokenRanges.add(tr); | |
else | |
goodTokenRanges.add(tr); | |
} | |
if (badTokenRanges.size() > 0) | |
hbuilder = Health.down(); | |
dcBadRanges.put(datacenter, badTokenRanges); | |
dcGoodRanges.put(datacenter, goodTokenRanges); | |
List<String> badNodes = metadata.getNodes().values() | |
.stream() | |
.filter(h -> (0 == h.getDatacenter().compareTo(datacenter))) | |
.filter(h -> (h.getState() != NodeState.UP)) | |
.map(h -> h.getEndPoint().toString()) | |
.collect(Collectors.toList()); | |
dcBadNodes.put(datacenter, badNodes); | |
dcGoodNodes.put(datacenter, | |
metadata.getNodes().values().stream() | |
.filter(h -> (0 == h.getDatacenter().compareTo(datacenter))) | |
.filter(h -> h.getState() == NodeState.UP) | |
.map(n -> n.getEndPoint().toString()).collect(Collectors.toList())); | |
} | |
// SimpleStrategy | |
List<TokenRange> simpleStrategyBadTokenRanges = new ArrayList<TokenRange>(); | |
List<String> simpleStrategyBadNodes = new ArrayList<String>(); | |
List<TokenRange> simpleStrategyGoodTokenRanges = new ArrayList<TokenRange>(); | |
List<String> simpleStrategyGoodNodes = new ArrayList<String>(); | |
if (null != simpleStrategyKeyspace) { | |
for (TokenRange tr : tokenMap.getTokenRanges()) { | |
tokenMap.getReplicas(simpleStrategyKeyspace, tr); | |
long numReplicas = tokenMap.getReplicas(simpleStrategyKeyspace, tr) | |
.stream() | |
.count(); | |
long numReplicasUp = tokenMap.getReplicas(simpleStrategyKeyspace, tr) | |
.stream() | |
.filter(h -> (h.getState() == NodeState.UP)) | |
.count(); | |
if (enoughReplicas.test(numReplicas, numReplicasUp)) | |
simpleStrategyBadTokenRanges.add(tr); | |
else | |
simpleStrategyGoodTokenRanges.add(tr); | |
} | |
if (simpleStrategyBadTokenRanges.size() > 0) | |
hbuilder = Health.down(); | |
simpleStrategyBadNodes = metadata.getNodes().values() | |
.stream() | |
.filter(h -> (h.getState() != NodeState.UP)) | |
.map(h -> h.getEndPoint().toString()) | |
.collect(Collectors.toList()); | |
simpleStrategyGoodNodes = metadata.getNodes().values() | |
.stream() | |
.filter(h -> (h.getState() == NodeState.UP)) | |
.map(h -> h.getEndPoint().toString()) | |
.collect(Collectors.toList()); | |
} | |
// All nodes | |
Map<String,NodeState> nodes = new HashMap<String,NodeState>(); | |
metadata.getNodes().values().forEach(n -> { | |
nodes.put(n.getEndPoint().toString(), n.getState()); | |
}); | |
// Provide Deatils | |
hbuilder.withDetail("BadTokenRanges", dcBadRanges); | |
hbuilder.withDetail("GoodTokenRanges", dcGoodRanges); | |
hbuilder.withDetail("DownNodes", dcBadNodes); | |
hbuilder.withDetail("UpNodes", dcGoodNodes); | |
hbuilder.withDetail("NumHosts", metadata.getNodes().values().size()); | |
hbuilder.withDetail("NumUpHosts", metadata.getNodes().values().stream().filter(n -> n.getState().equals(NodeState.UP)).count()); | |
hbuilder.withDetail("DataCenters", datacenters); | |
hbuilder.withDetail("Nodes", nodes); | |
hbuilder.withDetail("Keyspace Checked", dcToKeyspaceMap); | |
if (null != simpleStrategyKeyspace) { | |
hbuilder.withDetail("SimpleStrategy_BadTokenRanges", simpleStrategyBadTokenRanges); | |
hbuilder.withDetail("SimpleStrategy_BadNodes", simpleStrategyBadNodes); | |
hbuilder.withDetail("SimpleStrategy_GoodTokenRanges", simpleStrategyGoodTokenRanges); | |
hbuilder.withDetail("SimpleStrategy_GoodNodes", simpleStrategyGoodNodes); | |
} | |
return hbuilder.build(); | |
} | |
/* | |
If we want to, we can replace this method with a simple one that populates the dcToKeyspaceMap with a single | |
keyspace and datacenter gleaned from the driver settings. | |
*/ | |
protected void setupKeyspacesAndDatacenters(Metadata metadata) { | |
setKeyspaceAndDatacenterFromDriverSettings(metadata); | |
} | |
protected void setKeyspaceAndDatacenterFromDriverSettings(Metadata metadata) { | |
DriverExecutionProfile profile = session.getContext().getConfig().getDefaultProfile(); | |
String datacenter = profile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, null); | |
String keyspace = profile.getString(DefaultDriverOption.SESSION_KEYSPACE, null); | |
if ((null == datacenter) || (null == keyspace)) | |
getKeyspacesAndDatacenters(metadata); | |
else { | |
datacenters = null; | |
dcToKeyspaceMap.clear(); | |
simpleStrategyKeyspace = null; | |
dcToKeyspaceMap.put(datacenter, keyspace); | |
} | |
} | |
protected void getKeyspacesAndDatacenters(Metadata metadata) { | |
datacenters = null; | |
dcToKeyspaceMap.clear(); | |
simpleStrategyKeyspace = null; | |
datacenters = metadata.getNodes().values().stream().map(Node::getDatacenter).collect(Collectors.toSet()); | |
Map<String,Integer> dcMaxRF = new HashMap<String,Integer>(); | |
final int[] maxSimpleRF = {0}; | |
metadata.getKeyspaces().entrySet().stream().forEach(e -> { | |
CqlIdentifier name = e.getKey(); | |
KeyspaceMetadata kmeta = e.getValue(); | |
Map<String,String> replication = kmeta.getReplication(); | |
String className = replication.remove("class"); | |
if (null != className) { | |
if (className.equals("org.apache.cassandra.locator.NetworkTopologyStrategy")) { | |
replication | |
.forEach((k,v) -> { | |
if ((!dcToKeyspaceMap.containsKey(k)) | |
|| (dcMaxRF.get(k) < Integer.valueOf(v))) { | |
dcToKeyspaceMap.put(k, name.asInternal()); | |
dcMaxRF.put(k, Integer.valueOf(v)); | |
} | |
}); | |
} | |
else if (className.equals("org.apache.cassandra.locator.SimpleStrategy")) { | |
int rf = Integer.valueOf(replication.get("replication_factor")); | |
if (maxSimpleRF[0] < rf) { | |
simpleStrategyKeyspace = name.asInternal(); | |
maxSimpleRF[0] = rf; | |
} | |
} | |
} | |
}); | |
} | |
private boolean quorumReplicas(long numReplicas, long numReplicasUp) { | |
return (numReplicasUp >= (numReplicas / 2) + 1); | |
} | |
private boolean oneReplica(long numReplicas, long numReplicasUp) { | |
return (numReplicasUp > 1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment