Skip to content

Instantly share code, notes, and snippets.

@hadoopsters
Last active January 19, 2020 19:16
Show Gist options
  • Save hadoopsters/63a455b9cdd533dd026bcdfeb396c424 to your computer and use it in GitHub Desktop.
Save hadoopsters/63a455b9cdd533dd026bcdfeb396c424 to your computer and use it in GitHub Desktop.
Loading Data from Cassandra into Hadoop
import com.datastax.spark.connector.cql.CassandraConnectorConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.cassandra._
object CassandraLoader extends Serializable {
/** Representative of the some_keyspace.some_table table. */
case class MyCassandraTable(user_id: String, `type`: Int, key: String, value: String)
def main(args: Array[String]) { // scalastyle:off method.length
/**************************************************************************************
* INITIATE SPARK SESSION
* This session will be used to ingest data from a Cassandra cluster.
*************************************************************************************/
val spark = SparkSession
.builder()
.config("hive.merge.orcfile.stripe.level", "false")
.appName("Cassandra Data Loader")
.enableHiveSupport()
.getOrCreate()
// Implicits Allow us to Use .as[CaseClass]
import spark.implicits._
/**************************************************************************************
* SETUP CASSANDRA CONNECTION
* These settings determine which environment, keyspace and table to download.
*************************************************************************************/
val user = Map("spark.cassandra.auth.username" -> "some_username")
val pwd = Map("spark.cassandra.auth.password" -> "some_password")
// Setup an entrypoint into your cassandra cluster from spark
val hosts = "cassandra001.yourcompany.com,cassandra002.yourcompany.com,cassandra003.yourcompany.com"
val port = "9042"
// Set Cassandra Connection Configuration in Spark Session
spark.setCassandraConf(
CassandraConnectorConf.KeepAliveMillisParam.option(1000) ++
CassandraConnectorConf.ConnectionHostParam.option(hosts) ++
CassandraConnectorConf.ReadTimeoutParam.option(240000) ++
CassandraConnectorConf.ConnectionPortParam.option(port) ++
user ++ pwd)
// Imply which keyspace.table to consume from Cassandra
val table = Map("keyspace" -> "some_keyspace", "table" -> "some_table_in_that_keyspace")
/**************************************************************************************
* CONSUME DATA FROM CASSANDRA
* Use the connector, via the format() method, to pull the data and write it.
*************************************************************************************/
val data = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(table)
.load()
.as[MyCassandraTable]
// write to hdfs
data
.write
.option("orc.compress", "snappy")
.mode(SaveMode.Overwrite)
.orc("/some/location/in/hdfs/")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment