This class uses an async supplier to connect to cassandra.
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.time.Duration;
import io.advantageous.reakt.AsyncSupplier;
import io.advantageous.reakt.Ref;
import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.promise.Promises;
import org.slf4j.Logger;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static io.advantageous.reakt.guava.Guava.*;
import static org.slf4j.LoggerFactory.getLogger;
public class CassandraImprintStorageService implements ImprintStorageService {
/** Table to store impressions. */
public static final String IMPRESSIONS_TABLE = "imprints";
/** key space to do the storage. */
public static final String KEY_SPACE = "keyspace_imprints";
/** Logger. */
private static final Logger logger = getLogger(CassandraImprintStorageService.class);
/** Cassandra Session supplier. */
private final AsyncSupplier<Session> sessionAsyncSupplier;
/** QBit reactor for repeating tasks and callbacks that excecute on the caller's thread. */
private final Reactor reactor;
/** Reference to the cassandra session which get connected to async. */
private Ref<Session> sessionRef = Ref.empty();
/** Error counts from Cassandra driver for the last time period. */
private AtomicLong errorCount = new AtomicLong();
/**
*
* @param sessionAsyncSupplier supplier to supply Cassandra session.
* @param reactor reactor to manage callbacks and repeating tasks.
*/
public CassandraImprintStorageService(final AsyncSupplier<Session> sessionAsyncSupplier,
final Reactor reactor) {
this.sessionAsyncSupplier = sessionAsyncSupplier;
this.reactor = reactor;
/* Connect the Cassandra session. */
connectSession();
/* This makes sure we are connected.
* Provide circuit breaker if sessionRef is down to auto reconnect.
*/
reactor.addRepeatingTask(Duration.SECONDS.units(5), this::cassandraCircuitBreaker);
}
/**
* Store imprints into cassandra.
* @param callback callback
* @param imprints imprints
*/
@Override
public void store(final Callback<Boolean> callback,
final List<Imprint> imprints) {
sessionRef()
/* if we are not connected, fail fast. */
.ifEmpty(() -> callback.fail("Not connected to Cassandra"))
/* If we are connected then call cassandra. */
.ifPresent(session -> doStoreImprints(session, callback, imprints));
}
private void cassandraCircuitBreaker() {
/** If the sessionRef had more errors than allowed in the last time duration
* then close the sessionRef and reconnect.
*/
if (errorCount.get() > 10) {
final Ref<Session> oldRef = sessionRef(); //Get the old session.
setSessionRef(null); //shut it down.
try {
oldRef.ifPresent(Session::close); //Close the old session.
} catch (Exception ex) {
logger.error("Shutting down cassandra and it failed", ex);
}
connectSession();
return;
}
/** If the cassandra sessionRef is not connected or present, then connect the
* cassandra sessionRef. */
sessionRef()
.ifPresent(session -> {
/* If the session is closed then reconnect. */
if (session.isClosed()) {
setSessionRef(null);
connectSession();
}
})
.ifEmpty(this::connectSession);
}
/**
* Connects the cassandra connection.
*/
private void connectSession() {
sessionAsyncSupplier.get(
Promises.<Session>promise()
.then(session -> {
logger.info("Cassandra sessionRef is open");
setSessionRef(session);
})
.catchError(error -> {
logger.error("Error connecting to Cassandra", error);
setSessionRef(null);
})
);
}
/** Does the low level cassandra storage. */
private void doStoreImprints(final Session session,
final Callback<Boolean> callback,
final List<Imprint> imprints) {
/* Make many calls to cassandra using its async lib to store
each imprint. */
final List<Promise<Boolean>> promises = imprints.stream().map(imprint
-> doStoreImprint(session, imprint)).collect(Collectors.toList());
/*
* Create a parent promise to contain all of the promises we
* just created for each imprint.
*/
final Promise<Void> all = Promises.all(promises);
/*
* Store them all.
*/
all.then(nil -> callback.accept(true))
.catchError(callback::fail);
}
/**
* This gets called one time for each imprint passed to the <code>store(callback, imprints)</code> method.
* @param session cassandra session
* @param imprint imprint to store
* @return promise
*/
private Promise<Boolean> doStoreImprint(final Session session,
final Imprint imprint) {
final ResultSetFuture resultSetFuture = session.executeAsync(QueryBuilder.insertInto(IMPRESSIONS_TABLE)
.value("id", imprint.getId())
.value("metricType", imprint.getMetricType().name().toLowerCase())
.value("metricName", imprint.getMetricName())
.value("provider", imprint.getProvider().toString())
.value("externalId", imprint.getExternalId())
.value("value", imprint.getValue())
.value("created_at", imprint.getTimestamp())
);
final Promise<Boolean> returnedPromise = Promises.promise();
final Promise<ResultSet> promise = Promises.<ResultSet>promise()
.then(resultSet -> returnedPromise.reply(resultSet.wasApplied()))
.catchError((error) -> {
returnedPromise.fail(error);
if (error instanceof DriverException) {
logger.error("Error storing imprint", error);
errorCount.incrementAndGet();
}
});
registerCallback(resultSetFuture, promise);
return returnedPromise;
}
private synchronized void setSessionRef(Session session) {
this.sessionRef = Ref.ofNullable(session);
}
private synchronized Ref<Session> sessionRef() {
return sessionRef;
}
@QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
public void process() {
reactor.process();
}
}
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import io.advantageous.qbit.service.discovery.EndpointDefinition;
import io.advantageous.reakt.AsyncSupplier;
import io.advantageous.reakt.Callback;
import io.advantageous.reakt.promise.Promises;
import org.slf4j.Logger;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static io.advantageous.reakt.guava.Guava.registerCallback;
import static org.slf4j.LoggerFactory.getLogger;
public class CassandraSessionSupplier implements AsyncSupplier<Session> {
private final static AtomicInteger index = new AtomicInteger();
private static final Logger logger = getLogger(CassandraSessionSupplier.class);
private final AsyncSupplier<List<EndpointDefinition>> endpointDefinitionsAsyncSupplier;
private final int replicationFactor;
private final String keyspace;
private final String tableName;
private final ExecutorService executorService = Executors.newFixedThreadPool(3);
public CassandraSessionSupplier(final AsyncSupplier<List<EndpointDefinition>> endpointDefinitionAsyncSupplier,
final int replicationFactor,
final String keyspace,
final String tableName) {
this.endpointDefinitionsAsyncSupplier = endpointDefinitionAsyncSupplier;
this.replicationFactor = replicationFactor;
this.keyspace = keyspace;
this.tableName = tableName;
}
@Override
public void get(final Callback<Session> callback) {
logger.info("Loading Cassandra Session {} {}", keyspace, tableName);
endpointDefinitionsAsyncSupplier.get(
Promises.<List<EndpointDefinition>>promise()
.thenRef(listRef ->
listRef.filter(endpointDefinitions -> endpointDefinitions.size() > 0)
.map(this::getEndPointDef)
.ifEmpty(() -> callback.fail("Cassandra was not found"))
.ifPresent(endpointDefinition ->
createCassandraSessionWithEndpoint(callback, endpointDefinition))
)
.catchError((e) -> callback.fail("Unable to lookup cassandra", e)));
}
private void createCassandraSessionWithEndpoint(final Callback<Session> callback,
final EndpointDefinition endpointDefinition) {
registerCallback(
Cluster.builder()
.withPort(endpointDefinition.getPort())
.addContactPoints(endpointDefinition.getHost())
.build().connectAsync(),
Promises.<Session>promise()
.catchError(e -> callback.fail("Unable to load initial session", e))
.thenRef(sessionRef ->
sessionRef.ifEmpty(() -> callback.fail("Empty session returned from Cassandra Cluster"))
.ifPresent((Consumer<Session>) sessionWithoutKeyspace ->
buildDBIfNeeded(sessionWithoutKeyspace, callback, endpointDefinition))
));
}
private EndpointDefinition getEndPointDef(List<EndpointDefinition> endpointDefinitions) {
if (index.get() >= endpointDefinitions.size()) {
index.set(0);
}
return endpointDefinitions.get(index.getAndIncrement());
}
private void buildDBIfNeeded(final Session sessionWithoutKeyspace,
final Callback<Session> callback,
final EndpointDefinition endpointDefinition) {
executorService.execute(() -> {
try {
doBuildDatabase(sessionWithoutKeyspace);
} catch (Exception ex) {
callback.fail("Unable to create database", ex);
return;
}
loadSession(callback, endpointDefinition);
});
}
private void doBuildDatabase(Session sessionWithoutKeyspace) {
logger.info("Initializing Cassandra Tables if needed {} {}", keyspace, tableName);
sessionWithoutKeyspace.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH REPLICATION "
+ "= {'class':'SimpleStrategy', 'replication_factor':" + replicationFactor + "};");
sessionWithoutKeyspace.execute("USE " + keyspace);
sessionWithoutKeyspace.execute(
"CREATE TABLE IF NOT EXISTS " + tableName +
" (id bigint,\n" +
" metricType text,\n" +
" metricName text,\n" +
" provider text,\n" +
" externalId text,\n" +
" value bigint,\n" +
" created_at timestamp,\n" +
" primary key (artistId, created_at))\n" +
"WITH CLUSTERING ORDER BY (created_at desc);");
sessionWithoutKeyspace.close();
}
private void loadSession(final Callback<Session> callback,
final EndpointDefinition endpointDefinition) {
logger.info("Loading session with keyspace {} {}", keyspace, tableName);
registerCallback(
Cluster.builder()
.withPort(endpointDefinition.getPort())
.addContactPoints(endpointDefinition.getHost())
.build().connectAsync(keyspace),
Promises.<Session>promise()
.catchError(e -> callback.fail("Unable to load session", e))
.then(callback::reply)
);
}
}