Skip to content

Instantly share code, notes, and snippets.

@noboomu
Created April 2, 2015 15:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save noboomu/0e7ca2ec13941a845ec6 to your computer and use it in GitHub Desktop.
Save noboomu/0e7ca2ec13941a845ec6 to your computer and use it in GitHub Desktop.
Service class to access a Spark CassandraSQLContext from Java. Using DSE in this case.
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.cassandra.CassandraSQLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/* This is simply a wrapper around Typedafe's config */
import utilities.Configuration;
public class SparkService
{
private static Logger Logger = LoggerFactory.getLogger(SparkService.class.getCanonicalName());
protected final static AtomicBoolean isRunning = new AtomicBoolean(false);
/* ex: 192.168.137.100 */
private final static String CASSANDRA_HOST = Configuration.root().getString("spark.cassandra.connection.host");
/* ex: spark://sparkMaster:7077 */
private final static String SPARK_MASTER = Configuration.root().getString("spark.cassandra.connection.master");
private final static String KEYSPACE_NAME = Configuration.root().getString("spark.cassandra.keyspace");.
private static SparkConf SPARK_CONF = null;
private static JavaSparkContext SPARK_CONTEXT = null;
private static CassandraSQLContext CASSANDRA_SQL_CONTEXT = null;
/* Static initializer to setup spark context */
public static void initialize() {
try
{
SPARK_CONF = new SparkConf().setAppName("SparkOps").set("spark.cassandra.connection.host", CASSANDRA_HOST).setMaster(SPARK_MASTER);
SPARK_CONTEXT = new JavaSparkContext(SPARK_CONF);
CASSANDRA_SQL_CONTEXT = new CassandraSQLContext( JavaSparkContext.toSparkContext(SPARK_CONTEXT)) ;
CASSANDRA_SQL_CONTEXT.setKeyspace(KEYSPACE_NAME);
} catch (Exception e)
{
Logger.error(e.getMessage(),e);
}
}
public static void start() {
if(!isRunning.get())
{
isRunning.set(true);
Logger.info(SparkService.class.getSimpleName() + " is starting...");
initialize();
Logger.info(SparkService.class.getSimpleName() + " started...");
}
}
public static void stop() {
if (isRunning.get()) {
Logger.info(SparkService.class.getSimpleName() + " is stopping...");
if (SPARK_CONTEXT != null) {
SPARK_CONTEXT.stop();
}
isRunning.set(false);
Logger.info(SparkService.class.getSimpleName() + " is stopped.");
}
}
public static CassandraSQLContext getContext() {
if (CASSANDRA_SQL_CONTEXT == null) {
start();
}
return CASSANDRA_SQL_CONTEXT;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment