Skip to content

Instantly share code, notes, and snippets.

@burythehammer
Created March 29, 2017 15:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save burythehammer/cf42c1f1df8a2390e9f5bdeac4bb5bb8 to your computer and use it in GitHub Desktop.
Save burythehammer/cf42c1f1df8a2390e9f5bdeac4bb5bb8 to your computer and use it in GitHub Desktop.
Refactoring spark / cassandra code to be testable
def main(args: Array[String]) {
val sc = getSparkContext
/* 2*/ val now = new Date()
/* 3-5*/ val rdd: RDD[CassandraRow] = getCassandraTable(sc, now)
/* 6-9*/ val balances = calculateBalances(rdd)
/* 10-11*/ saveBalancesToCassandra(balances, now)
sc.stop()
}
def getCassandraTable(sc: SparkContext, now: Date): RDD[CassandraRow] = {
/* 3*/ sc.cassandraTable("cc", "cc_transactions")
/* 4*/ .select("customerid", "amount", "card", "status", "id")
/* 5*/ .where("id < minTimeuuid(?)", now)
}
def calculateBalances(rdd: RDD[CassandraRow]): RDD[((String, String), Int)] = {
/* 1 */ val includedStatuses = Set("COMPLETED", "REPAID")
/* 6*/ rdd.filter(includedStatuses contains _.getString("status"))
/* 7*/ .keyBy(row => (row.getString("customerid"), row.getString("card")))
/* 8*/ .map { case (key, value) => (key, value.getInt("amount")) }
/* 9*/ .reduceByKey(_ + _)
}
def saveBalancesToCassandra(rdd: RDD[((String, String), Int)], now: Date): Unit = {
/* 10*/ rdd.map { case ((customerid, card), balance) => (customerid, card, balance, now) }
/* 11*/ .saveToCassandra("cc", "cc_balance", SomeColumns("customerid", "card", "balance", "updated_at"))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment