Skip to content

Instantly share code, notes, and snippets.

@PatrickCallaghan
Last active August 29, 2015 14:07
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PatrickCallaghan/d0d55e89e839b2ffbf6a to your computer and use it in GitHub Desktop.
Save PatrickCallaghan/d0d55e89e839b2ffbf6a to your computer and use it in GitHub Desktop.
Creating a custom index table from an existing table with Apache Spark

The default is to put this on its own node so you will need to start DSE with dse cassandra -k to create a spark analytics node.

First run the https://github.com/PatrickCallaghan/datastax-userinteractions-demo project to populate the Cassandra cluster (follow instructions in README). Use this project to populate the Cassandra db with hundreds of thousands user interactions. The idea is to have users interacting with multiple apps and we can model this by user in Cassandra.

We have an existing table that has all the data for user interactions with certain applications on the appropriate date. Now, for some other requirements, we need the unique users that visited a certain page within an app on a certain day.

So the first requirement was - show me the user interactions with a certain app

Now, we have a new requirement - show me all the users that interacted with a certain app on a particular day.

NOTE you will need to add the following table to your schema

create table users_by_interaction_day(
  app text,
  action text,
  date text,
  user_id text,
  PRIMARY KEY ((app, action, date), user_id)
);

We have created our new table so all that is left is to read the existing table and map the data to the new one.

//Create Class for existing table
case class UserInteraction (user_id: String, app: String, time: java.util.Date, action: String)

//Create Class for the new table
case class UserInteractionByAction (app: String, action: String, date: String, user_id: String)

//We only want the date part of the time so that we can filter
val formatter = new java.text.SimpleDateFormat("dd-MM-yyyy")

//Get the data from the existing table
val interactionsRdd = sc.cassandraTable[UserInteraction]("datastax_user_interactions_demo","user_interactions").cache

//Map the existing table objects to the new table objects
val newObjects = interactionsRdd.map(f => (new UserInteractionByAction(f.app, f.action, formatter.format(f.time), f.user_id)));

//Save them to the new table. 
newObjects.saveToCassandra("datastax_user_interactions_demo", "users_by_interaction_day")

Hope this helps.

@manishobhatia
Copy link

Thanks for the tutorial, found this very useful. Is there a way to periodically keep the new table updated by scheduling a spark process to run ?

@PatrickCallaghan
Copy link
Author

Sure - you could have a look at http://spark.apache.org/docs/latest/job-scheduling.html or you could always just run a cron job to summit the job.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment