The default is to put this on its own node so you will need to start DSE with dse cassandra -k
to create a spark analytics node.
First run the https://github.com/PatrickCallaghan/order-management-demo project to populate the cassandra cluster (follow instructions in README). This project is based on the first bootcamp project and is a fairly general order management system (eg. users, accounts, products, orders). This can take a while so you may only want to run it for a minute or two to generate some data.
NOTE you will need to add the following table to your schema
CREATE TABLE useraccounts (
user_id text,
balance double,
city text,
first text,
last text,
state text,
PRIMARY KEY (user_id)
)
In a new window start the shark shell dse shark
Then you can use any of the scripts in the src/main/resources/hive
directory to run the hive scripts through shark.
Example:
INSERT INTO TABLE products_by_vendor_hive
SELECT recommendation, vendor, COUNT(distinct order_id) AS MyCount, product_id, product_name
FROM product_orders_by_vendor
GROUP BY recommendation, vendor, product_id, product_name SORT BY vendor, MyCount DESC;
Close the shark terminal and run a spark shell using dse spark
Now you can run the following commands to join the users
and accounts
table and filter some data. The result will be:
case class Account (user_id: String, account_id: String, balance: Double, last_updated: java.util.Date);
case class User (user_id: String, city_name: String, country_code: String, dob: java.util.Date, email: String, first_name: String, gender: String, last_name: String,
middle_name: String, phone_number: String, state_name: String, street_address: String,
zip_code: String);
case class UserAccount (user_id: String, balance: Double, first: String, last: String, city: String, state: String)
val accounts = sc.cassandraTable[Account]("order_management", "accounts").cache
val users = sc.cassandraTable[User]("order_management", "users").cache
val accountsByUserId = accounts.keyBy(f => f.user_id);
val usersByUserId = users.keyBy(f => f.user_id);
//Join the tables by the user_id
val joinedUsers = accountsByUserId.join(usersByUserId).cache
//Create RDD with a the new object type which maps to our new table
val userAccountObjects = joinedUsers.map({ case (key, (account, user)) => new UserAccount(account.user_id, account.balance, user.first_name, user.last_name, user.city_name, user.state_name)}).cache
//get the top ten results
val top10 = userAccountObjects.collect.toList.sortBy(_.balance).reverse.take(10)
val newRdd = sc.parallelize(top10);
//save to Cassandra
newRdd.saveToCassandra("order_management", "useraccounts")