Skip to content

Instantly share code, notes, and snippets.

@helena
Last active November 8, 2023 15:53
Show Gist options
  • Save helena/319490f92d775149b37d to your computer and use it in GitHub Desktop.
Save helena/319490f92d775149b37d to your computer and use it in GitHub Desktop.
JSON Integration with Spark SQL and Cassandra
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{Row, SQLContext}
/** Spark SQL: Txt, Parquet, JSON Support with the Spark Cassandra Connector */
object SampleJson extends App {
import com.datastax.spark.connector._
import GitHubEvents._
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1")
.setMaster("local[*]")
.setAppName("app2")
CassandraConnector(conf).withSessionDo { session =>
session.execute("DROP KEYSPACE IF EXISTS githubstats")
session.execute("CREATE KEYSPACE githubstats WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute("CREATE TABLE githubstats.monthly_commits (user VARCHAR PRIMARY KEY, commits INT, date INT)")
}
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val json = sc.parallelize(Seq("""{"user":"helena","commits":98, "month":12, "year":2014}""","""{"user":"pkolaczk", "commits":42, "month":12, "year":2014}"""))
sqlContext.jsonRDD(json).map(MonthlyCommits(_)).saveToCassandra("githubstats","monthly_commits")
sc.cassandraTable[MonthlyCommits]("githubstats","monthly_commits").collect foreach println
sc.stop()
}
@mdaafaq
Copy link

mdaafaq commented Dec 8, 2017

what is MonthlyCommits(_) here ?
I just wanted insert JSON data which come from Kafka queue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment