Skip to content

Instantly share code, notes, and snippets.

@pcmanus
Last active August 29, 2015 14:01
Show Gist options
  • Save pcmanus/95f1106185e99fb27b73 to your computer and use it in GitHub Desktop.
Save pcmanus/95f1106185e99fb27b73 to your computer and use it in GitHub Desktop.
/*
* Copyright (C) 2012 DataStax Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.driver.core;
import java.util.*;
import com.google.common.util.concurrent.*;
import org.testng.annotations.Test;
public class QueryInterceptionTest {
@Test(groups = "short")
public void queryInterceptionTest() throws Exception {
Cluster cluster = new ClusterWrapper(Cluster.builder().addContactPoints("127.0.0.1"));
Session s = cluster.connect();
s.execute("CREATE KEYSPACE IF NOT EXISTS ks WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
s.execute("USE ks");
s.execute("CREATE TABLE IF NOT EXISTS test (k int, v int, PRIMARY KEY (k, v))");
List<ResultSetFuture> futures = new ArrayList<ResultSetFuture>();
for (int i = 0; i < 3; i++)
futures.add(s.executeAsync(String.format("INSERT INTO test(k, v) VALUES(%d, %d)", i, i)));
Futures.allAsList(futures).get();
System.out.println("Result: " + s.execute("SELECT * FROM test"));
s.close();
}
public static class ClusterWrapper extends Cluster {
public ClusterWrapper(Cluster.Initializer initializer) {
super(initializer);
}
public Session newSession() {
return new InterceptingSession(this, super.newSession());
}
// Overriding this is not necessary in practice, but it doesn't hurt to be on the safe side
public Session connect() {
return new InterceptingSession(this, super.connect());
}
public Session connect(String keyspace) {
return new InterceptingSession(this, super.connect(keyspace));
}
}
public static class InterceptingSession extends AbstractSession {
private final ClusterWrapper cluster;
private final Session wrapped;
public InterceptingSession(ClusterWrapper cluster, Session wrapped) {
this.cluster = cluster;
this.wrapped = wrapped;
}
public void beforeQueryStarted(Statement statement) {
System.out.println("Executing " + statement);
}
public void afterQueryCompleted(long startTimeNanos, long stopTimeNanos, Statement statement, ResultSet results, Throwable exception) {
System.out.println("Executed " + statement + " in " + (stopTimeNanos - startTimeNanos) + "ns");
}
public String getLoggedKeyspace() {
return wrapped.getLoggedKeyspace();
}
public Session init() {
wrapped.init();
return this;
}
public ResultSetFuture executeAsync(final Statement statement) {
beforeQueryStarted(statement);
final long startTime = System.nanoTime();
ResultSetFuture future = wrapped.executeAsync(statement);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
public void onFailure(Throwable t) {
afterQueryCompleted(startTime, System.nanoTime(), statement, null, t);
}
public void onSuccess(ResultSet result) {
afterQueryCompleted(startTime, System.nanoTime(), statement, result, null);
}
});
return future;
}
public ListenableFuture<PreparedStatement> prepareAsync(String query) {
return wrapped.prepareAsync(query);
}
public CloseFuture closeAsync() {
return wrapped.closeAsync();
}
public boolean isClosed() {
return wrapped.isClosed();
}
public Cluster getCluster() {
return cluster;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment