Skip to content

Instantly share code, notes, and snippets.

@suyash
Created March 20, 2016 18:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save suyash/14250572aa6d414a211d to your computer and use it in GitHub Desktop.
Save suyash/14250572aa6d414a211d to your computer and use it in GitHub Desktop.
Spark SQL using SQLContext with a dynamically defined schema
group 'in.suyash.tests'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.4.0'
compile group: 'org.apache.spark', name: 'spark-sql_2.10', version: '1.4.0'
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.10', version: '1.4.0-M1'
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector-java_2.10', version: '1.4.0-M1'
}
sourceSets {
main {
java {
srcDir './'
}
}
}
// http://stackoverflow.com/a/14441628/3673043
jar {
doFirst {
from {
configurations.compile.collect {
it.isDirectory() ? it : zipTree(it)
}
}
}
exclude 'META-INF/*.RSA', 'META-INF/*.SF','META-INF/*.DSA'
// https://docs.gradle.org/2.4/dsl/org.gradle.api.tasks.bundling.Zip.html#org.gradle.api.tasks.bundling.Zip:zip64
zip64 = true
}
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import com.datastax.spark.connector.japi.CassandraRow;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
public class Main {
private static final String HOST = "spark://sparkmaster:7077";
// private static final String HOST = "local[4]";
private static final String APP_NAME = "Cassandra Spark SQL";
private static final String CASSANDRA_KEYSPACE = "wordcount";
private static final String CASSANDRA_COLUMN_FAMILY = "input";
public static void main (String... args) {
SparkConf conf = new SparkConf(true);
SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<CassandraRow> cassandraRdd = javaFunctions(sc).cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);
JavaRDD<Row> rdd = cassandraRdd.map(row -> RowFactory.create(row.getInt(0), row.getString(1)));
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("line", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame dataFrame = sqlContext.createDataFrame(rdd, schema);
dataFrame.registerTempTable("lines");
DataFrame resultDataFrame = sqlContext.sql("select line from lines where id = 1");
System.out.println(Arrays.asList(resultDataFrame.collect()));
}
}
all:
make build
make run
run:
spark-submit \
--class Main \
--conf "spark.driver.memory=512m" \
--conf "spark.executor.memory=2409m" \
--conf "spark.network.timeout=600s" \
--conf "spark.cassandra.connection.host=107.108.214.154" \
--conf "spark.cassandra.input.split.size_in_mb=67108864" \
./build/libs/CassandraSparkSQL2-1.0-SNAPSHOT.jar
build: clean
gradle jar
clean:
rm -rf build
.PHONY: all clean
rootProject.name = 'CassandraSparkSQL2'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment