Skip to content

Instantly share code, notes, and snippets.

@brianmhess
Last active August 29, 2015 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brianmhess/2313aad5caa24ea3fe81 to your computer and use it in GitHub Desktop.
Save brianmhess/2313aad5caa24ea3fe81 to your computer and use it in GitHub Desktop.

Setup

First we need to create the Cassandra table and populate with simple data.

In CQLSH:

CREATE KEYSPACE sparktest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
CREATE TABLE sparktest.test(pkey INT, ccol INT, x INT, y INT, PRIMARY KEY ((pkey), ccol));
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (1,1,1,1);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (2,2,2,2);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (3,3,3,3);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (4,4,4,4);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (1,10,10,10);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (2,20,20,20);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (3,30,30,30);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (4,40,40,40);
CREATE TABLE sparktest.test2(pkey INT, ctval INT, PRIMARY KEY ((pkey)));

To see our data, in CQLSH run:

SELECT * FROM sparktest.test;

The result should be:

cqlsh:sparktest> SELECT * FROM sparktest.test;

 pkey | ccol | x  | y
------+------+----+----
    1 |    1 |  1 |  1
    1 |   10 | 10 | 10
    2 |    2 |  2 |  2
    2 |   20 | 20 | 20
    4 |    4 |  4 |  4
    4 |   40 | 40 | 40
    3 |    3 |  3 |  3
    3 |   30 | 30 | 30

(8 rows)

Simple test

We will connect to the cluster, get the data from SPARKTEST.TEST, count the number of records per PKEY, and insert that into SPARKTEST.TEST2

Start the spark-shell with the spark.cassandra.connection.host set to one of the Cassandra host IP addresses (using 1.2.3.4 for illustrative purposes):

./bin/spark-shell --conf spark.cassandra.connection.host=1.2.3.4

If you are using DataStax Enterprise, you can simply start spark-shell with:

dse spark

Then, in the spark-shell:

import com.datastax.spark.connector._
case class Test(pkey: Int, ccol: Int, x: Int, y: Int)
case class TestCt(pkey: Int, ctval: Int)
sc.cassandraTable[Test]("sparktest", "test").
  map(x => (x.pkey,1)).
  reduceByKey((x,y) => x+y).
  map(x => TestCt(x._1,x._2)).
  saveToCassandra("sparktest", "test2")

Now to see if the data is in SPARKTEST.TEST2. In CQLSH:

SELECT * FROM sparktest.test2;

The output you should see is:

cqlsh:sparktest> SELECT * FROM sparktest.test2;

 pkey | ctval
------+-------
    1 |     2
    2 |     2
    4 |     2
    3 |     2

(4 rows)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment