Skip to content

Instantly share code, notes, and snippets.

@suyash
Created March 20, 2016 18:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save suyash/7cdfeaccaac791d34e82 to your computer and use it in GitHub Desktop.
Save suyash/7cdfeaccaac791d34e82 to your computer and use it in GitHub Desktop.
Cassandra to STDOUT using Spark

Clone, Import as a gradle project called CassandraSparkMapReduce and

make
group 'in.suyash.tests'
version '1.0-SNAPSHOT'
apply plugin: 'java'
apply plugin: 'application'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.4.0'
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.10', version: '1.4.0-M1'
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector-java_2.10', version: '1.4.0-M1'
testCompile group: 'junit', name: 'junit', version: '4.11'
}
sourceSets {
main {
java {
srcDir './'
}
}
}
mainClassName = 'Main'
// http://stackoverflow.com/a/14441628/3673043
jar {
doFirst {
from {
configurations.compile.collect {
it.isDirectory() ? it : zipTree(it)
}
}
}
exclude 'META-INF/*.RSA', 'META-INF/*.SF','META-INF/*.DSA'
}
import java.util.Arrays;
import java.util.Map;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;
import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.SparkContextJavaFunctions;
import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
public class Main {
private static final String HOST = "spark://sparkmaster:7077";
// private static final String HOST = "local[4]";
private static final String APP_NAME = "Cassandra Spark WordCount";
private static final String CASSANDRA_KEYSPACE = "wordcount";
private static final String CASSANDRA_COLUMN_FAMILY = "input";
public static void main (String... args) {
SparkConf conf = new SparkConf(true);
SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SparkContextJavaFunctions context = javaFunctions(sc);
CassandraJavaRDD<CassandraRow> rdd = context.cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);
JavaRDD<String> words = rdd.flatMap(row -> Arrays.asList(row.getString("line").split(" ")));
// JavaPairRDD<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
// Map<String, Integer> counts = pairs.reduceByKey((c1, c2) -> c1 + c2).collectAsMap();
Map<String, Long> counts = words.countByValue();
System.out.println(counts);
}
}
all:
make build
make run
run:
spark-submit \
--class Main \
--conf "spark.driver.memory=512m" \
--conf "spark.executor.memory=2409m" \
--conf "spark.network.timeout=600s" \
--conf "spark.cassandra.connection.host=107.108.214.154" \
--conf "spark.cassandra.input.split.size_in_mb=67108864" \
./build/libs/CassandraSparkMapReduce-1.0-SNAPSHOT.jar
build: clean
gradle jar
clean:
rm -rf build
.PHONY: all clean
rootProject.name = 'CassandraSparkMapReduce'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment