Skip to content

Instantly share code, notes, and snippets.

@RichardHightower
Created April 9, 2016 01:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RichardHightower/f79fe5fcce88acef8782f299edc52e52 to your computer and use it in GitHub Desktop.
Save RichardHightower/f79fe5fcce88acef8782f299edc52e52 to your computer and use it in GitHub Desktop.
Example of using QBit circuit breaker logic with Cassandra using Reakt

This class uses an async supplier to connect to cassandra.

Example of circuit breaker for Cassandra.

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.time.Duration;
import io.advantageous.reakt.AsyncSupplier;
import io.advantageous.reakt.Ref;
import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.promise.Promises;
import org.slf4j.Logger;

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static io.advantageous.reakt.guava.Guava.*;
import static org.slf4j.LoggerFactory.getLogger;

public class CassandraImprintStorageService implements ImprintStorageService {

    /** Table to store impressions. */
    public static final String IMPRESSIONS_TABLE = "imprints";
    /** key space to do the storage. */
    public static final String KEY_SPACE = "keyspace_imprints";
    /** Logger. */
    private static final Logger logger = getLogger(CassandraImprintStorageService.class);
    /** Cassandra Session supplier. */
    private final AsyncSupplier<Session> sessionAsyncSupplier;
    /** QBit reactor for repeating tasks and callbacks that excecute on the caller's thread. */
    private final Reactor reactor;
    /** Reference to the cassandra session which get connected to async. */
    private Ref<Session> sessionRef = Ref.empty();
    /** Error counts from Cassandra driver for the last time period. */
    private AtomicLong errorCount = new AtomicLong();

    /**
     *
     * @param sessionAsyncSupplier supplier to supply Cassandra session.
     * @param reactor reactor to manage callbacks and repeating tasks.
     */
    public CassandraImprintStorageService(final AsyncSupplier<Session> sessionAsyncSupplier,
                                          final Reactor reactor) {
        this.sessionAsyncSupplier = sessionAsyncSupplier;
        this.reactor = reactor;

        /* Connect the Cassandra session. */
        connectSession();

        /* This makes sure we are connected.
        *  Provide circuit breaker if sessionRef is down to auto reconnect.
        */
        reactor.addRepeatingTask(Duration.SECONDS.units(5), this::cassandraCircuitBreaker);
    }


    /**
     * Store imprints into cassandra.
     * @param callback callback
     * @param imprints imprints
     */
    @Override
    public void store(final Callback<Boolean> callback,
                      final List<Imprint> imprints) {

        sessionRef()
                /* if we are not connected, fail fast. */
                .ifEmpty(() -> callback.fail("Not connected to Cassandra"))
                /* If we are connected then call cassandra. */
                .ifPresent(session -> doStoreImprints(session, callback, imprints));

    }

    private void cassandraCircuitBreaker() {

        /** If the sessionRef had more errors than allowed in the last time duration
         * then close the sessionRef and reconnect.
         */
        if (errorCount.get() > 10) {

            final Ref<Session> oldRef = sessionRef(); //Get the old session.
            setSessionRef(null); //shut it down.

            try {
                oldRef.ifPresent(Session::close); //Close the old session.
            } catch (Exception ex) {
                logger.error("Shutting down cassandra and it failed", ex);
            }
            connectSession();
            return;
        }

        /** If the cassandra sessionRef is not connected or present, then connect the
         * cassandra sessionRef. */
        sessionRef()
                .ifPresent(session -> {
                    /* If the session is closed then reconnect. */
                    if (session.isClosed()) {
                        setSessionRef(null);
                        connectSession();
                    }
                })
                .ifEmpty(this::connectSession);

    }

    /**
     * Connects the cassandra connection.
     */
    private void connectSession() {

        sessionAsyncSupplier.get(
                Promises.<Session>promise()
                        .then(session -> {
                            logger.info("Cassandra sessionRef is open");
                            setSessionRef(session);
                        })
                        .catchError(error -> {
                            logger.error("Error connecting to Cassandra", error);
                            setSessionRef(null);
                        })
        );
    }


    /** Does the low level cassandra storage. */
    private void doStoreImprints(final Session session,
                                 final Callback<Boolean> callback,
                                 final List<Imprint> imprints) {


        /* Make many calls to cassandra using its async lib to store
        each imprint. */
        final List<Promise<Boolean>> promises = imprints.stream().map(imprint
                -> doStoreImprint(session, imprint)).collect(Collectors.toList());


        /*
         * Create a parent promise to contain all of the promises we
         * just created for each imprint.
         */
        final Promise<Void> all = Promises.all(promises);

        /*
         * Store them all.
         */
        all.then(nil -> callback.accept(true))
                .catchError(callback::fail);

    }


    /**
     * This gets called one time for each imprint passed to the <code>store(callback, imprints)</code> method.
     * @param session cassandra session
     * @param imprint imprint to store
     * @return promise
     */
    private Promise<Boolean> doStoreImprint(final Session session,
                                            final Imprint imprint) {


        final ResultSetFuture resultSetFuture = session.executeAsync(QueryBuilder.insertInto(IMPRESSIONS_TABLE)
                .value("id", imprint.getId())
                .value("metricType", imprint.getMetricType().name().toLowerCase())
                .value("metricName", imprint.getMetricName())
                .value("provider", imprint.getProvider().toString())
                .value("externalId", imprint.getExternalId())
                .value("value", imprint.getValue())
                .value("created_at", imprint.getTimestamp())

        );


        final Promise<Boolean> returnedPromise = Promises.promise();
        final Promise<ResultSet> promise = Promises.<ResultSet>promise()
                .then(resultSet -> returnedPromise.reply(resultSet.wasApplied()))
                .catchError((error) -> {
                    returnedPromise.fail(error);
                    if (error instanceof DriverException) {
                        logger.error("Error storing imprint", error);
                        errorCount.incrementAndGet();
                    }
                });

        registerCallback(resultSetFuture, promise);

        return returnedPromise;
    }

    private synchronized void setSessionRef(Session session) {
        this.sessionRef = Ref.ofNullable(session);
    }

    private synchronized Ref<Session> sessionRef() {
        return sessionRef;
    }

    @QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
    public void process() {
        reactor.process();
    }
}

Example of Async supplier that finds Cassandra.

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import io.advantageous.qbit.service.discovery.EndpointDefinition;
import io.advantageous.reakt.AsyncSupplier;
import io.advantageous.reakt.Callback;
import io.advantageous.reakt.promise.Promises;
import org.slf4j.Logger;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static io.advantageous.reakt.guava.Guava.registerCallback;
import static org.slf4j.LoggerFactory.getLogger;

public class CassandraSessionSupplier implements AsyncSupplier<Session> {

    private final static AtomicInteger index = new AtomicInteger();
    private static final Logger logger = getLogger(CassandraSessionSupplier.class);
    private final AsyncSupplier<List<EndpointDefinition>> endpointDefinitionsAsyncSupplier;
    private final int replicationFactor;
    private final String keyspace;
    private final String tableName;
    private final ExecutorService executorService = Executors.newFixedThreadPool(3);

    public CassandraSessionSupplier(final AsyncSupplier<List<EndpointDefinition>> endpointDefinitionAsyncSupplier,
                                    final int replicationFactor,
                                    final String keyspace,
                                    final String tableName) {
        this.endpointDefinitionsAsyncSupplier = endpointDefinitionAsyncSupplier;
        this.replicationFactor = replicationFactor;
        this.keyspace = keyspace;
        this.tableName = tableName;
    }


    @Override
    public void get(final Callback<Session> callback) {

        logger.info("Loading Cassandra Session {} {}", keyspace, tableName);
        endpointDefinitionsAsyncSupplier.get(
                Promises.<List<EndpointDefinition>>promise()
                        .thenRef(listRef ->
                                listRef.filter(endpointDefinitions -> endpointDefinitions.size() > 0)
                                        .map(this::getEndPointDef)
                                        .ifEmpty(() -> callback.fail("Cassandra was not found"))
                                        .ifPresent(endpointDefinition ->
                                                createCassandraSessionWithEndpoint(callback, endpointDefinition))
                        )
                        .catchError((e) -> callback.fail("Unable to lookup cassandra", e)));

    }

    private void createCassandraSessionWithEndpoint(final Callback<Session> callback,
                                                    final EndpointDefinition endpointDefinition) {

        registerCallback(
                Cluster.builder()
                        .withPort(endpointDefinition.getPort())
                        .addContactPoints(endpointDefinition.getHost())
                        .build().connectAsync(),
                Promises.<Session>promise()
                        .catchError(e -> callback.fail("Unable to load initial session", e))
                        .thenRef(sessionRef ->
                                sessionRef.ifEmpty(() -> callback.fail("Empty session returned from Cassandra Cluster"))
                                        .ifPresent((Consumer<Session>) sessionWithoutKeyspace ->
                                                buildDBIfNeeded(sessionWithoutKeyspace, callback, endpointDefinition))
                        ));

    }

    private EndpointDefinition getEndPointDef(List<EndpointDefinition> endpointDefinitions) {
        if (index.get() >= endpointDefinitions.size()) {
            index.set(0);
        }
        return endpointDefinitions.get(index.getAndIncrement());
    }

    private void buildDBIfNeeded(final Session sessionWithoutKeyspace,
                                 final Callback<Session> callback,
                                 final EndpointDefinition endpointDefinition) {

        executorService.execute(() -> {

            try {
                doBuildDatabase(sessionWithoutKeyspace);
            } catch (Exception ex) {
                callback.fail("Unable to create database", ex);
                return;
            }
            loadSession(callback, endpointDefinition);
        });



    }

    private void doBuildDatabase(Session sessionWithoutKeyspace) {

        logger.info("Initializing Cassandra Tables if needed {} {}", keyspace, tableName);
        sessionWithoutKeyspace.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH REPLICATION "
                + "= {'class':'SimpleStrategy', 'replication_factor':" + replicationFactor + "};");
        sessionWithoutKeyspace.execute("USE " + keyspace);
        sessionWithoutKeyspace.execute(
                "CREATE TABLE IF NOT EXISTS " + tableName +
                        " (id bigint,\n" +
                        " metricType text,\n" +
                        " metricName text,\n" +
                        " provider text,\n" +
                        " externalId text,\n" +
                        " value bigint,\n" +
                        " created_at timestamp,\n" +
                        " primary key (artistId, created_at))\n" +
                        "WITH CLUSTERING ORDER BY (created_at desc);");

        sessionWithoutKeyspace.close();
    }

    private void loadSession(final Callback<Session> callback,
                             final EndpointDefinition endpointDefinition) {


        logger.info("Loading session with keyspace {} {}", keyspace, tableName);


        registerCallback(
                Cluster.builder()
                        .withPort(endpointDefinition.getPort())
                        .addContactPoints(endpointDefinition.getHost())
                        .build().connectAsync(keyspace),
                Promises.<Session>promise()
                        .catchError(e -> callback.fail("Unable to load session", e))
                        .then(callback::reply)
        );

    }


}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment