Skip to content

Instantly share code, notes, and snippets.

@PatrickCallaghan
Last active April 4, 2017 21:22
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save PatrickCallaghan/d9724b05309f5d312e2a to your computer and use it in GitHub Desktop.
Save PatrickCallaghan/d9724b05309f5d312e2a to your computer and use it in GitHub Desktop.
Gets the Top10 account balances from the joined tables 'Users' and 'Accounts'

The default is to put this on its own node so you will need to start DSE with dse cassandra -k to create a spark analytics node.

First run the https://github.com/PatrickCallaghan/order-management-demo project to populate the cassandra cluster (follow instructions in README). This project is based on the first bootcamp project and is a fairly general order management system (eg. users, accounts, products, orders). This can take a while so you may only want to run it for a minute or two to generate some data.

NOTE you will need to add the following table to your schema

CREATE TABLE useraccounts (
  user_id text,
  balance double,
  city text,
  first text,
  last text,
  state text,
  PRIMARY KEY (user_id)
)

In a new window start the shark shell dse shark

Then you can use any of the scripts in the src/main/resources/hive directory to run the hive scripts through shark.

Example:

INSERT INTO TABLE products_by_vendor_hive
SELECT recommendation, vendor, COUNT(distinct order_id) AS MyCount, product_id, product_name 
FROM product_orders_by_vendor 
GROUP BY recommendation, vendor, product_id, product_name SORT BY vendor, MyCount DESC;

Close the shark terminal and run a spark shell using dse spark

Now you can run the following commands to join the users and accounts table and filter some data. The result will be:

case class Account (user_id: String, account_id: String, balance: Double, last_updated: java.util.Date);

case class User (user_id: String, city_name: String, country_code: String, dob: java.util.Date, email: String, first_name: String, gender: String, last_name: String,
middle_name: String, phone_number: String, state_name: String, street_address: String, 
zip_code: String);

case class UserAccount (user_id: String, balance: Double, first: String, last: String, city: String, state: String)

val accounts = sc.cassandraTable[Account]("order_management", "accounts").cache
  
val users = sc.cassandraTable[User]("order_management", "users").cache

val accountsByUserId = accounts.keyBy(f => f.user_id);
val usersByUserId = users.keyBy(f => f.user_id);

//Join the tables by the user_id
val joinedUsers = accountsByUserId.join(usersByUserId).cache

//Create RDD with a the new object type which maps to our new table
val userAccountObjects = joinedUsers.map({ case (key, (account, user)) => new UserAccount(account.user_id, account.balance, user.first_name, user.last_name, user.city_name, user.state_name)}).cache


//get the top ten results 
val top10 = userAccountObjects.collect.toList.sortBy(_.balance).reverse.take(10)

val newRdd = sc.parallelize(top10);

//save to Cassandra
newRdd.saveToCassandra("order_management", "useraccounts")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment