Skip to content

Instantly share code, notes, and snippets.

@brianmhess
Created March 18, 2015 15:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brianmhess/8baafcfaa605d4f78b97 to your computer and use it in GitHub Desktop.
Save brianmhess/8baafcfaa605d4f78b97 to your computer and use it in GitHub Desktop.
import java.util.*;
import java.lang.*;
import java.math.BigInteger;
import com.datastax.driver.core.*;
import com.datastax.driver.core.policies.*;
class CassReadMT {
class ThreadExecute extends Thread {
private Cluster cluster;
private Session session;
private String beginToken;
private String endToken;
private String cqlcmd;
private String partitionKey;
private String node;
private long numResults;
public ThreadExecute(String inNode, String inCqlcmd, String inPartitionKey, String inBeginToken, String inEndToken) {
super();
node = inNode;
cqlcmd = inCqlcmd;
partitionKey = inPartitionKey;
beginToken = inBeginToken;
endToken = inEndToken;
numResults = 0;
}
public void connect() {
cluster = Cluster.builder()
.addContactPoint(node)
.withLoadBalancingPolicy(new TokenAwarePolicy( new DCAwareRoundRobinPolicy()))
.build();
session = cluster.connect();
}
public void close() {
cluster.close();
}
public int evaluate() {
if(cqlcmd == null) return -1;
int nresults = 0;
String cmd = cqlcmd + " WHERE Token(" + partitionKey + ") > " + beginToken
+ " AND Token(" + partitionKey + ") <= " + endToken;
PreparedStatement statement = session.prepare(cmd).setConsistencyLevel(ConsistencyLevel.ONE);
BoundStatement bound = statement.bind();
ResultSet rs = session.execute(bound);
for (Row row : rs) {
// Iterate over results
// row.getColumnObject(0);
nresults += 1;
}
return nresults;
}
public long getNumResults() {
return numResults;
}
public void run() {
connect();
numResults = evaluate();
close();
}
}
public void run(String[] args) throws InterruptedException {
if (args.length != 6) {
System.err.println("Usage: CassRead <connection point> <cql cmd> <partition key> <begin> <end> <num threads>");
return;
}
String connection_point = args[0];
String cqlcmd = args[1];
String partitionKey = args[2];
String beginString = args[3];
String endString = args[4];
int numThreads = Integer.valueOf(args[5]);
BigInteger begin = new BigInteger(beginString);
BigInteger end = new BigInteger(endString);
BigInteger delta = end.subtract(begin).divide(new BigInteger(String.valueOf(numThreads)));
System.out.println("Running: " + cqlcmd + " with " + numThreads + " threads");
ThreadExecute[] tlist = new ThreadExecute[numThreads];
for (int i = 0; i < numThreads-1; i++) {
String tbegin = begin.add(delta.multiply(new BigInteger(String.valueOf(i)))).toString();
String tend = begin.add(delta.multiply(new BigInteger(String.valueOf(i+1)))).toString();
tlist[i] = new ThreadExecute(connection_point, cqlcmd, partitionKey, tbegin, tend);
tlist[i].start();
}
String lbegin = begin.add(delta.multiply(new BigInteger(String.valueOf(numThreads-1)))).toString();
tlist[numThreads-1] = new ThreadExecute(connection_point, cqlcmd, partitionKey, lbegin, end.toString());
tlist[numThreads-1].start();
int nresults = 0;
for (int i = 0; i < numThreads; i++) {
tlist[i].join();
nresults += tlist[i].getNumResults();
}
System.out.println("nresults: " + nresults);
}
public static void main(String[] args) throws InterruptedException {
CassReadMT crmt = new CassReadMT();
crmt.run(args);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment