Skip to content

Instantly share code, notes, and snippets.

@brunocrt
Forked from hkhamm/testing_spark_cassandra.md
Created March 13, 2018 20:59
Show Gist options
  • Save brunocrt/5cb1ded9f6412f5dd48c54b4741c6cb3 to your computer and use it in GitHub Desktop.
Save brunocrt/5cb1ded9f6412f5dd48c54b4741c6cb3 to your computer and use it in GitHub Desktop.
Testing Spark and Cassandra

Testing Spark and Cassandra

This guide leads the user through a basic test of a local Spark and Cassandra software stack.

####Table of Contents

Assumptions

This guide assumes you have:

sudo echo 'export PATH="$PATH:/opt/spark/bin"' >> ~/.profile
  • made the spark and connector directories accessable to the user running the Spark shell:
sudo chown <user>:<user> /opt/spark
sudo chown <user>:<user> /opt/cassandra
  • made sure Cassandra is running:
sudo service cassandra restart
  • made sure the Spark master and a worker are running:
bash /opt/spark/sbin/stop-all.sh
bash /opt/spark/sbin/start-all.sh

spark-shell

Use the spark-shell command and give it the spark-cassandra-connector libraries with the '--driver-class-path' option, the jars you will need with the '--jars' option, and the master with the '--master' option.

spark-shell --driver-class-path $(echo /opt/connector/*.jar | sed 's/ /:/g') --jars $(echo /opt/connector/*.jar | sed 's/ /,/g') --master local[2]

You can watch as the shell initializes itself. After a bit, you will either see the basic Scala prompt or it will eventually stop printing to stdout and you can hit enter to clear the prompt.

scala>

The command creates a default SparkContext instance. Cassandra needs a custom context, so you will need to stop the pre-instantiated SparkContext, called 'sc', and start up a new instance.

Stop the old SparkContext:

scala> sc.stop

As before, this will often overrun the Scala prompt and you might have to hit enter to get a clear prompt.

Use the ':paste' command to enter paste mode:

scala> :paste

Copy and paste this Scala script, but make sure to change the keyspace and table names passed to sc.cassandraTable to reflect those in your Cassandra database.

import org.apache.spark._
import com.datastax.spark.connector._

// Create the configuration object necessary to start a SparkContext
val conf = new SparkConf()

// Point the SparkConf at the local Cassandra database
conf.set("spark.cassandra.connection.host", "127.0.0.1")

// Set the authentication credentials when using password authentication in Cassandra 
conf.set("spark.cassandra.auth.username", "cassandra")
conf.set("spark.cassandra.auth.password", "cassandra")

// Create the new SparkContext
val sc = new SparkContext(conf)

// Access keyspace.table in Cassandra
val table = sc.cassandraTable("keyspace", "table")

// Count the number of rows in the table
table.count

After pasting, make sure you are on a new line and hit ctrl-d to exit paste mode and run the script. The script will run for a bit, creating the new SparkContext, adding the jars, and talking with Cassandra. At the end, you should see the number of rows in your database. If everything worked out, you have just run a successful Spark/Cassandra test. Congratulations!

spark-submit

The spark-submit command allows you to run a jar file instead of pasting a script into Spark's Scala shell. You can create a Scala jar with the command line tool sbt or with an IDE like Intellij.

Here is a version of the script above you can use in your jar. Don't forget to change the keyspace and table names.

import org.apache.spark._
import com.datastax.spark.connector._

object SparkTest extends App {

  // Create the configuration object necessary to start a SparkContext
  val conf = new SparkConf()

  // Point the SparkConf at the local Cassandra database
  conf.set("spark.cassandra.connection.host", "127.0.0.1")

  // Set the authentication credentials when using password authentication in Cassandra
  conf.set("spark.cassandra.auth.username", "cassandra")
  conf.set("spark.cassandra.auth.password", "cassandra")

  // Create the new SparkContext
  val sc = new SparkContext(conf)

  // Access keyspace.table in Cassandra
  val table = sc.cassandraTable("keyspace", "table")

  // Count the number of rows in the table
  val rows = table.count()

  println("")
  println(s"The Cassandra database has $rows rows.")
  println("")
}

Here is the associated build.sbt.

import AssemblyKeys._

assemblySettings

name := "sparktest"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.0.2",
  "com.datastax.spark" %% "spark-cassandra-connector" % "1.0.0-rc3"
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

mergeStrategy in assembly := {
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.rsa$") => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.dsa$") => MergeStrategy.discard
  case _ => MergeStrategy.first
}

Here is project/plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

The spark-submit command can take all the same options used above for spark-shell. In addition, you must provide your Scala program's main class or object name with the '--class' option. Here the jar is in the user's home directory and named 'sparktest.jar'.

spark-submit --driver-class-path $(echo /opt/connector/*.jar | sed 's/ /:/g') --jars $(echo /opt/connector/*.jar | sed 's/ /,/g') --master local[2] --class SparkTest ~/sparktest.jar

Just as above, you can watch as it executes the script. If everything goes well, you should see a message at the end that reports the number of rows in your Cassandra database.

Troubleshooting

The spark-submit command rejects jars with invalid signature files. You can check for this before submitting the jar to Spark.

jarsigner -verify sparktest.jar

Here is the error you'll see if there is a problem.

jarsigner: java.lang.SecurityException: Invalid signature file digest for Manifest main attributes

You can remove the signature file(s) and unsign the jar using the zip command line tool with the '-d' option to delete all *.RSA, *.DSA, and *.SF files from the jar's META_INF directory.

zip -d sparktest.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF

Now jarsigner will tell you the following and spark-submit shouldn't complain about an invalid signature file.

jar is unsigned. (signatures missing or not parsable)

Thanks to Al Toby, Open Source Mechanic at DataStax, for the connector installation script and for the blog post that helped me write this guide.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment