Skip to content

Instantly share code, notes, and snippets.

@shiv4nsh
Last active November 7, 2016 06:38
Show Gist options
  • Save shiv4nsh/0c3f62e3afd95634a6061b405c774582 to your computer and use it in GitHub Desktop.
Save shiv4nsh/0c3f62e3afd95634a6061b405c774582 to your computer and use it in GitHub Desktop.
val conf = new SparkConf()
.setAppName("Application1")
.setMaster(MasterIP)
.set("spark.sql.shuffle.partitions", "8")
.set("spark.cassandra.connection.host", CassandraIP)
.set("spark.sql.crossJoin.enabled", "true")
.set("spark.kryoserializer.buffer.max", "640m")
.set("spark.executor.memory", "5g")
.set("spark.executor.cores", "2")
.set("spark.cassandra.output.batch.size.rows", "1")
.set("spark.cassandra.output.batch.size.bytes", "2048")
import sqlContext.implicits._
val df1 = sc.cassandraTable[A](keyspaceName, table1).toDF
df1.cache()
df1.show()
val df2 = sc.cassandraTable[B](keyspaceName, tabl2).toDF
df2.cache()
df2.show()
import org.apache.spark.sql.functions._
val df3 = sc.cassandraTable[C](keyspaceName, table3).toDF()
df3.show()
val joinedTable = df1.join(df2, "topic_id")
val selectedJoint = joinedTable.select("doc_id", "topic_id", "terms")
val joinedTable2 = selectedJoint.join(df3, selectedJoint("doc_id") === df3("id"))
val dfToBeProcessed = joinedTable2.select("id", "topic_id", "text", "terms")
val selectedDF = dfToBeProcessed.selectExpr("id", "topic_id", "text", "terms", "getvalues(topic_id, text, terms) as value")
selectedDF.show
val df4 = selectedDF.select("id", "value").groupBy("id").agg(collect_set("value")).map { row =>
val id = row.getLong(0)
val value = BackTag(row.getAs[Seq[Row]](1).map {
case Row(b: Seq[Row]) => b.map(a => IdentifiedTerms(a.getString(0), a.getString(1), a.getLong(2), a.getInt(3)))
}.flatMap(identity).toList)
val valueASString = Store.convertToString(value)
(id, valueASString)
}.toDF("id", "value")
backtagged.show
val resultDF = df4.join(df3.drop("value"), "id")
resultDF.show()
resultDF.printSchema()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment