Created
March 18, 2015 15:21
-
-
Save brianmhess/8baafcfaa605d4f78b97 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.lang.*; | |
import java.math.BigInteger; | |
import com.datastax.driver.core.*; | |
import com.datastax.driver.core.policies.*; | |
class CassReadMT { | |
class ThreadExecute extends Thread { | |
private Cluster cluster; | |
private Session session; | |
private String beginToken; | |
private String endToken; | |
private String cqlcmd; | |
private String partitionKey; | |
private String node; | |
private long numResults; | |
public ThreadExecute(String inNode, String inCqlcmd, String inPartitionKey, String inBeginToken, String inEndToken) { | |
super(); | |
node = inNode; | |
cqlcmd = inCqlcmd; | |
partitionKey = inPartitionKey; | |
beginToken = inBeginToken; | |
endToken = inEndToken; | |
numResults = 0; | |
} | |
public void connect() { | |
cluster = Cluster.builder() | |
.addContactPoint(node) | |
.withLoadBalancingPolicy(new TokenAwarePolicy( new DCAwareRoundRobinPolicy())) | |
.build(); | |
session = cluster.connect(); | |
} | |
public void close() { | |
cluster.close(); | |
} | |
public int evaluate() { | |
if(cqlcmd == null) return -1; | |
int nresults = 0; | |
String cmd = cqlcmd + " WHERE Token(" + partitionKey + ") > " + beginToken | |
+ " AND Token(" + partitionKey + ") <= " + endToken; | |
PreparedStatement statement = session.prepare(cmd).setConsistencyLevel(ConsistencyLevel.ONE); | |
BoundStatement bound = statement.bind(); | |
ResultSet rs = session.execute(bound); | |
for (Row row : rs) { | |
// Iterate over results | |
// row.getColumnObject(0); | |
nresults += 1; | |
} | |
return nresults; | |
} | |
public long getNumResults() { | |
return numResults; | |
} | |
public void run() { | |
connect(); | |
numResults = evaluate(); | |
close(); | |
} | |
} | |
public void run(String[] args) throws InterruptedException { | |
if (args.length != 6) { | |
System.err.println("Usage: CassRead <connection point> <cql cmd> <partition key> <begin> <end> <num threads>"); | |
return; | |
} | |
String connection_point = args[0]; | |
String cqlcmd = args[1]; | |
String partitionKey = args[2]; | |
String beginString = args[3]; | |
String endString = args[4]; | |
int numThreads = Integer.valueOf(args[5]); | |
BigInteger begin = new BigInteger(beginString); | |
BigInteger end = new BigInteger(endString); | |
BigInteger delta = end.subtract(begin).divide(new BigInteger(String.valueOf(numThreads))); | |
System.out.println("Running: " + cqlcmd + " with " + numThreads + " threads"); | |
ThreadExecute[] tlist = new ThreadExecute[numThreads]; | |
for (int i = 0; i < numThreads-1; i++) { | |
String tbegin = begin.add(delta.multiply(new BigInteger(String.valueOf(i)))).toString(); | |
String tend = begin.add(delta.multiply(new BigInteger(String.valueOf(i+1)))).toString(); | |
tlist[i] = new ThreadExecute(connection_point, cqlcmd, partitionKey, tbegin, tend); | |
tlist[i].start(); | |
} | |
String lbegin = begin.add(delta.multiply(new BigInteger(String.valueOf(numThreads-1)))).toString(); | |
tlist[numThreads-1] = new ThreadExecute(connection_point, cqlcmd, partitionKey, lbegin, end.toString()); | |
tlist[numThreads-1].start(); | |
int nresults = 0; | |
for (int i = 0; i < numThreads; i++) { | |
tlist[i].join(); | |
nresults += tlist[i].getNumResults(); | |
} | |
System.out.println("nresults: " + nresults); | |
} | |
public static void main(String[] args) throws InterruptedException { | |
CassReadMT crmt = new CassReadMT(); | |
crmt.run(args); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment