Skip to content

Instantly share code, notes, and snippets.

@rsumbaly
Created June 28, 2011 20:46
Show Gist options
  • Save rsumbaly/1052163 to your computer and use it in GitHub Desktop.
Save rsumbaly/1052163 to your computer and use it in GitHub Desktop.
package voldemort.utils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import voldemort.client.protocol.RequestFormatType;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.serialization.DefaultSerializerFactory;
import voldemort.serialization.Serializer;
import voldemort.server.RequestRoutingType;
import voldemort.store.Store;
import voldemort.store.StoreDefinition;
import voldemort.store.socket.SocketStoreFactory;
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
import voldemort.versioning.Versioned;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
public class Entropy {
private int nodeId;
private long numKeys;
public static long DEFAULT_NUM_KEYS = 10000;
/**
* Entropy constructor. Uses DEFAULT_NUM_KEYS number of keys
*
* @param nodeId Node id. If -1, goes over all of them
* @param numThreads Number of threads
*/
public Entropy(int nodeId) {
this.nodeId = nodeId;
this.numKeys = DEFAULT_NUM_KEYS;
}
/**
* Entropy constructor
*
* @param nodeId Node id. If -1, goes over all of them
* @param numKeys Number of keys
*/
public Entropy(int nodeId, long numKeys) {
this.nodeId = nodeId;
this.numKeys = numKeys;
}
public static void main(String args[]) throws IOException {
OptionParser parser = new OptionParser();
parser.accepts("help", "print help information");
parser.accepts("node", "Node id")
.withRequiredArg()
.describedAs("node-id")
.ofType(Integer.class);
parser.accepts("cluster-xml", "[REQUIRED] Path to cluster-xml")
.withRequiredArg()
.describedAs("xml")
.ofType(String.class);
parser.accepts("stores-xml", "[REQUIRED] Path to stores-xml")
.withRequiredArg()
.describedAs("xml")
.ofType(String.class);
parser.accepts("output-dir",
"[REQUIRED] The output directory where we'll store / retrieve the keys. ")
.withRequiredArg()
.describedAs("output-dir")
.ofType(String.class);
parser.accepts("op-type",
"Operation type - false ( save keys ) [ default ], true ( run entropy calculator ) ")
.withRequiredArg()
.ofType(Boolean.class);
parser.accepts("num-keys",
"Number of keys per store [ Default: " + Entropy.DEFAULT_NUM_KEYS + " ]")
.withRequiredArg()
.describedAs("keys")
.ofType(Long.class);
OptionSet options = parser.parse(args);
if(options.has("help")) {
parser.printHelpOn(System.out);
System.exit(0);
}
Set<String> missing = CmdUtils.missing(options, "cluster-xml", "stores-xml", "output-dir");
if(missing.size() > 0) {
System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing));
parser.printHelpOn(System.err);
System.exit(1);
}
// compulsory params
String clusterXml = (String) options.valueOf("cluster-xml");
String storesXml = (String) options.valueOf("stores-xml");
String outputDirPath = (String) options.valueOf("output-dir");
long numKeys = CmdUtils.valueOf(options, "num-keys", Entropy.DEFAULT_NUM_KEYS);
int nodeId = CmdUtils.valueOf(options, "node", 0);
boolean opType = CmdUtils.valueOf(options, "op-type", false);
File outputDir = new File(outputDirPath);
if(!outputDir.exists()) {
outputDir.mkdirs();
} else if(!(outputDir.isDirectory() && outputDir.canWrite())) {
System.err.println("Cannot write to output directory " + outputDirPath);
parser.printHelpOn(System.err);
System.exit(1);
}
if(!Utils.isReadableFile(clusterXml) || !Utils.isReadableFile(storesXml)) {
System.err.println("Cannot read metadata file ");
System.exit(1);
}
// Parse the metadata
Cluster cluster = new ClusterMapper().readCluster(new File(clusterXml));
List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new File(storesXml));
Entropy detector = new Entropy(nodeId, numKeys);
detector.generateEntropy(cluster, storeDefs, outputDir, opType);
}
/**
* Run the actual entropy calculation tool
*
* @param cluster The cluster metadata
* @param storeDefs The list of stores
* @param storeDir The store directory
* @param opType Operation type - true ( run entropy calculator ), false (
* save keys )
* @throws IOException
*/
public void generateEntropy(Cluster cluster,
List<StoreDefinition> storeDefs,
File storeDir,
boolean opType) throws IOException {
AdminClient adminClient = null;
try {
adminClient = new AdminClient(cluster,
new AdminClientConfig().setMaxConnectionsPerNode(storeDefs.size()));
if(opType) {
System.out.println("Running entropy calculator");
} else {
System.out.println("Generating keys for future entropy calculation");
Utils.mkdirs(storeDir);
}
for(StoreDefinition storeDef: storeDefs) {
File storesKeyFile = new File(storeDir, storeDef.getName());
if(AdminClient.restoreStoreEngineBlackList.contains(storeDef.getType())) {
System.out.println("Ignoring store " + storeDef.getName());
continue;
} else {
System.out.println("Working on store " + storeDef.getName());
}
RoutingStrategy strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef,
cluster);
SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2,
10000,
100000,
32 * 1024);
// Cache connections to all nodes for this store in advance
HashMap<Integer, Store<ByteArray, byte[], byte[]>> socketStoresPerNode = Maps.newHashMap();
for(Node node: cluster.getNodes()) {
socketStoresPerNode.put(node.getId(),
socketStoreFactory.create(storeDef.getName(),
node.getHost(),
node.getSocketPort(),
RequestFormatType.PROTOCOL_BUFFERS,
RequestRoutingType.IGNORE_CHECKS));
}
if(!opType) {
if(storesKeyFile.exists()) {
System.err.println("Key files for " + storeDef.getName()
+ " already exists");
continue;
}
FileOutputStream writer = null;
try {
writer = new FileOutputStream(storesKeyFile);
Iterator<Pair<ByteArray, Versioned<byte[]>>> entriesIterator = null;
if(nodeId == -1) {
int numKeysPerNode = (int) Math.floor(numKeys
/ cluster.getNumberOfNodes());
for(Node node: cluster.getNodes()) {
entriesIterator = adminClient.fetchEntries(node.getId(),
storeDef.getName(),
cluster.getNodeById(node.getId())
.getPartitionIds(),
null,
true);
for(long keyId = 0; keyId < numKeysPerNode
&& entriesIterator.hasNext(); keyId++) {
Pair<ByteArray, Versioned<byte[]>> pair = entriesIterator.next();
ByteArray key = pair.getFirst();
Versioned<byte[]> value = pair.getSecond();
if(RebalanceUtils.getNodeIds(strategy.routeRequest(key.get())
.subList(0, 1))
.contains(node.getId())
&& null != value && value.getValue().length > 0) {
List<Node> responsibleNodes = strategy.routeRequest(key.get());
boolean missingKey = false;
for(Node responsibleNode: responsibleNodes) {
List<Versioned<byte[]>> newValue = socketStoresPerNode.get(responsibleNode.getId())
.get(key,
null);
if(newValue == null || newValue.size() == 0) {
missingKey = true;
}
}
if(!missingKey) {
writer.write(key.length());
writer.write(key.get());
}
}
}
}
} else {
entriesIterator = adminClient.fetchEntries(nodeId,
storeDef.getName(),
cluster.getNodeById(nodeId)
.getPartitionIds(),
null,
true);
for(long keyId = 0; keyId < numKeys && entriesIterator.hasNext(); keyId++) {
Pair<ByteArray, Versioned<byte[]>> pair = entriesIterator.next();
ByteArray key = pair.getFirst();
Versioned<byte[]> value = pair.getSecond();
if(RebalanceUtils.getNodeIds(strategy.routeRequest(key.get())
.subList(0, 1))
.contains(nodeId)
&& null != value && value.getValue().length > 0) {
List<Node> responsibleNodes = strategy.routeRequest(key.get());
boolean missingKey = false;
for(Node responsibleNode: responsibleNodes) {
List<Versioned<byte[]>> newValue = socketStoresPerNode.get(responsibleNode.getId())
.get(key,
null);
if(newValue == null || newValue.size() == 0) {
missingKey = true;
}
}
if(!missingKey) {
writer.write(key.length());
writer.write(key.get());
}
}
}
}
} finally {
if(writer != null)
writer.close();
}
} else {
Serializer<Object> keySerializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(storeDef.getKeySerializer());
if(!(storesKeyFile.exists() && storesKeyFile.canRead())) {
System.err.println("Could not find " + storeDef.getName()
+ " file to check");
continue;
}
FileInputStream reader = null;
long foundKeys = 0L;
long totalKeys = 0L;
long nullValueCount = 0L;
long zeroValueCount = 0L;
try {
reader = new FileInputStream(storesKeyFile);
while(reader.available() != 0) {
int size = reader.read();
if(size <= 0) {
break;
}
// Read the key
byte[] key = new byte[size];
reader.read(key);
List<Node> responsibleNodes = strategy.routeRequest(key);
boolean missingKey = false;
for(Node node: responsibleNodes) {
List<Versioned<byte[]>> value = socketStoresPerNode.get(node.getId())
.get(new ByteArray(key),
null);
if(value == null) {
missingKey = true;
nullValueCount++;
} else if(value.size() == 0) {
missingKey = true;
zeroValueCount++;
System.out.println("Zone value length key: "
+ keySerializer.toObject(key) + " on node "
+ node.getId());
}
}
if(!missingKey)
foundKeys++;
totalKeys++;
}
System.out.println("Found = " + foundKeys + " Total = " + totalKeys
+ " ZeroLengthKeys = " + zeroValueCount
+ " NullValueKeys = " + nullValueCount);
if(foundKeys > 0 && totalKeys > 0) {
System.out.println("%age found - " + 100.0 * (double) foundKeys
/ totalKeys);
}
} finally {
if(reader != null)
reader.close();
// close all socket stores
for(Store<ByteArray, byte[], byte[]> store: socketStoresPerNode.values()) {
store.close();
}
}
}
}
} finally {
if(adminClient != null)
adminClient.stop();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment