Created
March 20, 2016 18:46
-
-
Save suyash/14250572aa6d414a211d to your computer and use it in GitHub Desktop.
Spark SQL using SQLContext with a dynamically defined schema
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
group 'in.suyash.tests' | |
version '1.0-SNAPSHOT' | |
apply plugin: 'java' | |
sourceCompatibility = 1.8 | |
repositories { | |
mavenCentral() | |
} | |
dependencies { | |
compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.4.0' | |
compile group: 'org.apache.spark', name: 'spark-sql_2.10', version: '1.4.0' | |
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.10', version: '1.4.0-M1' | |
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector-java_2.10', version: '1.4.0-M1' | |
} | |
sourceSets { | |
main { | |
java { | |
srcDir './' | |
} | |
} | |
} | |
// http://stackoverflow.com/a/14441628/3673043 | |
jar { | |
doFirst { | |
from { | |
configurations.compile.collect { | |
it.isDirectory() ? it : zipTree(it) | |
} | |
} | |
} | |
exclude 'META-INF/*.RSA', 'META-INF/*.SF','META-INF/*.DSA' | |
// https://docs.gradle.org/2.4/dsl/org.gradle.api.tasks.bundling.Zip.html#org.gradle.api.tasks.bundling.Zip:zip64 | |
zip64 = true | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.SparkContext; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.sql.DataFrame; | |
import org.apache.spark.sql.Row; | |
import org.apache.spark.sql.RowFactory; | |
import org.apache.spark.sql.SQLContext; | |
import org.apache.spark.sql.types.DataTypes; | |
import org.apache.spark.sql.types.StructField; | |
import org.apache.spark.sql.types.StructType; | |
import com.datastax.spark.connector.japi.CassandraRow; | |
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; | |
public class Main { | |
private static final String HOST = "spark://sparkmaster:7077"; | |
// private static final String HOST = "local[4]"; | |
private static final String APP_NAME = "Cassandra Spark SQL"; | |
private static final String CASSANDRA_KEYSPACE = "wordcount"; | |
private static final String CASSANDRA_COLUMN_FAMILY = "input"; | |
public static void main (String... args) { | |
SparkConf conf = new SparkConf(true); | |
SparkContext sc = new SparkContext(HOST, APP_NAME, conf); | |
SQLContext sqlContext = new SQLContext(sc); | |
JavaRDD<CassandraRow> cassandraRdd = javaFunctions(sc).cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY); | |
JavaRDD<Row> rdd = cassandraRdd.map(row -> RowFactory.create(row.getInt(0), row.getString(1))); | |
List<StructField> fields = new ArrayList<>(); | |
fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true)); | |
fields.add(DataTypes.createStructField("line", DataTypes.StringType, true)); | |
StructType schema = DataTypes.createStructType(fields); | |
DataFrame dataFrame = sqlContext.createDataFrame(rdd, schema); | |
dataFrame.registerTempTable("lines"); | |
DataFrame resultDataFrame = sqlContext.sql("select line from lines where id = 1"); | |
System.out.println(Arrays.asList(resultDataFrame.collect())); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
all: | |
make build | |
make run | |
run: | |
spark-submit \ | |
--class Main \ | |
--conf "spark.driver.memory=512m" \ | |
--conf "spark.executor.memory=2409m" \ | |
--conf "spark.network.timeout=600s" \ | |
--conf "spark.cassandra.connection.host=107.108.214.154" \ | |
--conf "spark.cassandra.input.split.size_in_mb=67108864" \ | |
./build/libs/CassandraSparkSQL2-1.0-SNAPSHOT.jar | |
build: clean | |
gradle jar | |
clean: | |
rm -rf build | |
.PHONY: all clean |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
rootProject.name = 'CassandraSparkSQL2' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment