Skip to content

Instantly share code, notes, and snippets.

@seregasheypak
Last active August 29, 2015 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seregasheypak/6ddf5b24cd1c195f5355 to your computer and use it in GitHub Desktop.
Save seregasheypak/6ddf5b24cd1c195f5355 to your computer and use it in GitHub Desktop.
//create cassandra connection, please see how I do connect to cluster
private Cluster buildCluster(){
Cluster.Builder builder = Cluster.builder();
connectionSettings.getCassandraAddresses().forEach(builder::addContactPoint);
if(connectionSettings.getMinPoolSize() !=null){
builder.withPoolingOptions(createPoolingOptions());
}
return builder
.withLoadBalancingPolicy(new RoundRobinPolicy())
.withSocketOptions(new SocketOptions()
.setConnectTimeoutMillis(DEFAULT_TIMEOUT)
.setReadTimeoutMillis(DEFAULT_TIMEOUT))
.build();
}
private PoolingOptions createPoolingOptions(){
return new PoolingOptions()
.setCoreConnectionsPerHost(HostDistance.LOCAL, 8) //Core connections for LOCAL hosts must be less than max (54 > 8) ???
.setMaxConnectionsPerHost(HostDistance.LOCAL, 8) //Core connections for LOCAL hosts must be less than max (54 > 8) ???
.setCoreConnectionsPerHost(HostDistance.REMOTE, 2) //Core connections for REMOTE hosts must be less than max (54 > 2)
.setMaxConnectionsPerHost(HostDistance.REMOTE, connectionSettings.getMaxPoolSize())
.setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, 128) //Max simultaneous requests per connection for LOCAL hosts must be in the range (0, 128)
.setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 128); //Max simultaneous requests per connection for REMOTE hosts must be in the range (0, 128)
}
//Here is method that inserts data.
try(final Session session = getCassandraConnector().getSession()) {
final PreparedStatement preparedStatement = session.prepare(composeQuery());
ExecutorService service = Executors.newFixedThreadPool(getStressConnectionSettings().getThreadCount());
System.out.println("Create fixed threadPool for ["+getStressConnectionSettings().getThreadCount()+"] threads");
final AtomicInteger threadCounter = new AtomicInteger(0);
while (inputQueue.peek() != null) {
List<Object[]> values = inputQueue.poll();
service.submit(() -> {
final int threadNumber = threadCounter.incrementAndGet();
System.out.println(" ===>>> Starting ["+threadNumber+"] thread");
System.out.println("thread ["+threadNumber+"] polled values["+values.size()+"]");
int insertCounter = 0;
for(Object[] value : values){
BoundStatement boundStatement = new BoundStatement(preparedStatement);
boundStatement.bind(value);
final Timer.Context context = insertTimer.time();
if(getStressConnectionSettings().isExecuteAsync()){
ResultSetFuture future = session.executeAsync(boundStatement);
Futures.addCallback(future,
new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
metrics.meter("onSuccess").mark();
}
@Override
public void onFailure(Throwable t) {
metrics.meter("onFailure").mark();
}
},
MoreExecutors.sameThreadExecutor()
);
}else{
session.execute(boundStatement);
}
context.stop();
insertCounter++;
if(insertCounter%100 == 0){
System.out.println("thread ["+threadNumber+"] inserted["+insertCounter+"] rows");
}
}
System.out.println("### thread["+threadNumber+"] done");
});
}
}
System.out.println("sleeping");
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment