Skip to content

Instantly share code, notes, and snippets.

@hakanilter
Last active March 16, 2018 04:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hakanilter/3e4f7bd861a2a224669c3fbd3a592737 to your computer and use it in GitHub Desktop.
Save hakanilter/3e4f7bd861a2a224669c3fbd3a592737 to your computer and use it in GitHub Desktop.
Apache Spark - Apache Cassandra Integration

Create a new instance, edit following file

sudo vim /etc/yum.repos.d/cassandra.repo

Add Cassandra repo

[cassandra]
name=Apache Cassandra
baseurl=https://www.apache.org/dist/cassandra/redhat/311x/
gpgcheck=1
repo_gpgcheck=1
gpgkey=https://www.apache.org/dist/cassandra/KEYS

Install Java & Cassandra

sudo yum install java-1.8.0
sudo yum remove java-1.7.0-openjdk
sudo yum install cassandra
sudo service cassandra start
sudo chkconfig cassandra on

run cqlsh

CREATE KEYSPACE demo WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
USE demo;
CREATE TABLE emp ( empID int, deptID int, first_name varchar,last_name varchar, PRIMARY KEY (empID, deptID));
INSERT INTO emp (empID, deptID, first_name, last_name) VALUES (104, 15, 'jane', 'smith');

Download and run zeppelin

wget http://www.mirrorservice.org/sites/ftp.apache.org/zeppelin/zeppelin-0.7.3/zeppelin-0.7.3-bin-all.tgz
tar xvf zeppelin*
cd zeppelin
bin/zeppelin-daemon.sh start

Add dependency

com.datastax.spark:spark-cassandra-connector_2.11:2.0.7

Cassandra DataFrame

val df = spark.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map( "table" -> "emp", "keyspace" -> "demo"))
      .load()
df.printSchema()
df.createOrReplaceTempView("emp")

Test SQL

%sql
SELECT * FROM emp

Write to Cassandra

case class Employee(empid: Integer, deptid: Integer, first_name: String, last_name: String)

val valueDF = spark.sparkContext.parallelize(Seq(
        Employee(1, 1, "Dave", "Mustaine"),
        Employee(2, 1, "David", "Ellefson"),
        Employee(3, 1, "Marty", "Friedman"),
        Employee(4, 1, "Nick", "Menza")        
    )).toDF
    
valueDF.write
        .mode("append")
        .format("org.apache.spark.sql.cassandra")
        .options(Map( "table" -> "emp", "keyspace" -> "demo"))
        .save()

Terminate your instance, drink a coffee :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment