Last active
November 7, 2016 06:38
-
-
Save shiv4nsh/0c3f62e3afd95634a6061b405c774582 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val conf = new SparkConf() | |
.setAppName("Application1") | |
.setMaster(MasterIP) | |
.set("spark.sql.shuffle.partitions", "8") | |
.set("spark.cassandra.connection.host", CassandraIP) | |
.set("spark.sql.crossJoin.enabled", "true") | |
.set("spark.kryoserializer.buffer.max", "640m") | |
.set("spark.executor.memory", "5g") | |
.set("spark.executor.cores", "2") | |
.set("spark.cassandra.output.batch.size.rows", "1") | |
.set("spark.cassandra.output.batch.size.bytes", "2048") | |
import sqlContext.implicits._ | |
val df1 = sc.cassandraTable[A](keyspaceName, table1).toDF | |
df1.cache() | |
df1.show() | |
val df2 = sc.cassandraTable[B](keyspaceName, tabl2).toDF | |
df2.cache() | |
df2.show() | |
import org.apache.spark.sql.functions._ | |
val df3 = sc.cassandraTable[C](keyspaceName, table3).toDF() | |
df3.show() | |
val joinedTable = df1.join(df2, "topic_id") | |
val selectedJoint = joinedTable.select("doc_id", "topic_id", "terms") | |
val joinedTable2 = selectedJoint.join(df3, selectedJoint("doc_id") === df3("id")) | |
val dfToBeProcessed = joinedTable2.select("id", "topic_id", "text", "terms") | |
val selectedDF = dfToBeProcessed.selectExpr("id", "topic_id", "text", "terms", "getvalues(topic_id, text, terms) as value") | |
selectedDF.show | |
val df4 = selectedDF.select("id", "value").groupBy("id").agg(collect_set("value")).map { row => | |
val id = row.getLong(0) | |
val value = BackTag(row.getAs[Seq[Row]](1).map { | |
case Row(b: Seq[Row]) => b.map(a => IdentifiedTerms(a.getString(0), a.getString(1), a.getLong(2), a.getInt(3))) | |
}.flatMap(identity).toList) | |
val valueASString = Store.convertToString(value) | |
(id, valueASString) | |
}.toDF("id", "value") | |
backtagged.show | |
val resultDF = df4.join(df3.drop("value"), "id") | |
resultDF.show() | |
resultDF.printSchema() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment