Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Planet Cassandra Spark Blog
create KEYSPACE spark_demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
create table spark_demo.raw_files (filename text,line int, contents text, PRIMARY KEY(filename,line));
create table spark_demo.users (id int PRIMARY KEY );
create table spark_demo.movies (id int PRIMARY KEY, name text, year int);
create table spark_demo.ratings (id int PRIMARY KEY, user_id int, movie_id int, rating float );
import com.datastax.spark.connector._
import org.apache.spark.SparkContext, org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
val conf = new SparkConf(true).set("", “localhost")
val sc = new SparkContext(conf)
val test_spark_rdd = sc.cassandraTable(“spark_demo”, “raw_files”)
# This script takes the files from the Movie Lens project (
# reads them in line by line and saves them to a table for raw data in Cassandra
from cassandra import cluster
my_cluster = cluster.Cluster(['localhost'])
session = my_cluster.connect('spark_demo')
raw_movies = open( './ml-1m/movies.dat', 'rb')
for idx,line in enumerate(raw_movies):
session.execute('INSERT INTO raw_files (line,filename,contents) VALUES (%s,%s,%s)', (idx, 'movies.dat', line))
raw_ratings = open( './ml-1m/ratings.dat', 'rb')
for idx,line in enumerate(raw_ratings):
if (idx % 10000 ) == 0:
print "{}".format(idx)
session.execute_async('INSERT INTO raw_files (line,filename,contents) VALUES (%s,%s,%s)', (idx, 'ratings.dat',line))
raw_users = open('./ml-1m/users.dat', 'rb')
for idx,line in enumerate(raw_users):
session.execute('INSERT INTO raw_files (line,filename,contents) VALUES (%s,%s,%s)', (idx, 'users.dat', line))
// users.dat structure is: UserID::Gender::Age::Occupation::Zip-code
// movies.dat structure is: MovieID::Title::Genres
// ratings.dat structure is: UserID::MovieID::Rating::Timestamp
case class RawFileData(Filename: String, Line: Int, Contents: String )
case class User(Id: Int, Gender: String, Age: Int, Occupation: Int, ZipCode: String )
case class Movie(Id: Int, Title: String, Genres: String)
case class Rating(UserId: Int, MovieId: Int, Rating: Float)
val raw_files = sc.cassandraTable[RawFileData]("spark_demo", "raw_files" )
val users = raw_files.filter( raw => raw.Filename == "users.dat" ).
map(raw => raw.Contents.trim).map( raw => raw.split("::")).
map(raw=>User(raw(0).toInt, raw(1), raw(2).toInt, raw(3).toInt, raw(4)))
users.saveToCassandra("spark_demo", "users")
val movies = raw_files.filter( raw => raw.Filename == "movies.dat" ).
map(raw => raw.Contents.trim).map( raw => raw.split("::")).
movies.saveToCassandra("spark_demo", "movies")
val ratings = raw_files.filter( raw => raw.Filename == "ratings.dat" ).
map(raw => raw.Contents.trim).map( raw => raw.split("::")).
ratings.saveToCassandra("spark_demo", "ratings")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.