First we need to create the Cassandra table and populate with simple data.
In CQLSH:
CREATE KEYSPACE sparktest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
CREATE TABLE sparktest.test(pkey INT, ccol INT, x INT, y INT, PRIMARY KEY ((pkey), ccol));
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (1,1,1,1);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (2,2,2,2);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (3,3,3,3);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (4,4,4,4);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (1,10,10,10);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (2,20,20,20);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (3,30,30,30);
INSERT INTO sparktest.test(pkey,ccol,x,y) VALUES (4,40,40,40);
CREATE TABLE sparktest.test2(pkey INT, ctval INT, PRIMARY KEY ((pkey)));
To see our data, in CQLSH run:
SELECT * FROM sparktest.test;
The result should be:
cqlsh:sparktest> SELECT * FROM sparktest.test;
pkey | ccol | x | y
------+------+----+----
1 | 1 | 1 | 1
1 | 10 | 10 | 10
2 | 2 | 2 | 2
2 | 20 | 20 | 20
4 | 4 | 4 | 4
4 | 40 | 40 | 40
3 | 3 | 3 | 3
3 | 30 | 30 | 30
(8 rows)
We will connect to the cluster, get the data from SPARKTEST.TEST, count the number of records per PKEY, and insert that into SPARKTEST.TEST2
Start the spark-shell with the spark.cassandra.connection.host set to one of the Cassandra host IP addresses (using 1.2.3.4 for illustrative purposes):
./bin/spark-shell --conf spark.cassandra.connection.host=1.2.3.4
If you are using DataStax Enterprise, you can simply start spark-shell with:
dse spark
Then, in the spark-shell:
import com.datastax.spark.connector._
case class Test(pkey: Int, ccol: Int, x: Int, y: Int)
case class TestCt(pkey: Int, ctval: Int)
sc.cassandraTable[Test]("sparktest", "test").
map(x => (x.pkey,1)).
reduceByKey((x,y) => x+y).
map(x => TestCt(x._1,x._2)).
saveToCassandra("sparktest", "test2")
Now to see if the data is in SPARKTEST.TEST2. In CQLSH:
SELECT * FROM sparktest.test2;
The output you should see is:
cqlsh:sparktest> SELECT * FROM sparktest.test2;
pkey | ctval
------+-------
1 | 2
2 | 2
4 | 2
3 | 2
(4 rows)