Skip to content

Instantly share code, notes, and snippets.

@chris-zen
Created April 16, 2015 16:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chris-zen/8b5e7d0f351d22440e06 to your computer and use it in GitHub Desktop.
Save chris-zen/8b5e7d0f351d22440e06 to your computer and use it in GitHub Desktop.
Apache Spark prototypes
package org.upf.bg.condel.calc
import org.apache.spark.{ SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.hadoop.conf.Configuration
import org.bson.BSONObject
import org.bson.BasicBSONObject
import org.json4s._
import org.json4s.native.JsonMethods._
import java.io.FileReader
/**
Apache Spark Job to calculate Condel scores for all the SNVs
in the MongoDB database. It requires the input collection to
be sharded accross the execution nodes.
More info about Condel at http://bg.upf.edu/fannsdb/
*/
object CondelCalc {
// Predictors used to calculate Condel
// val predictors = List("SIFT", "PPH2", "MA", "FATHMM")
val predictors = List("MA", "FATHMM")
// Required by the json4s extract
implicit val formats = DefaultFormats
// Predictor metrics required to calculate Condel
case class PredictorMetrics(
val rmin: Double, val rmax: Double,
val ncd: Array[Double], val pcd: Array[Double],
val cutoff: Double, val index: Int
)
def calcWeight(score: Double, pm: PredictorMetrics) = {
val normalizedScore = (score - pm.rmin) / (pm.rmax - pm.rmin)
if (normalizedScore < pm.cutoff)
pm.ncd(pm.index)
else
pm.pcd(pm.index)
}
def main(args: Array[String]) {
// Check input arguments
if (args.length < 1) {
System.err.println("Usage: CondelCalc <metrics>")
System.exit(1)
}
// Initialize the Spark context
val sparkConf = new SparkConf()
.setMaster(sys.env.getOrElse("MASTER", "local"))
.setAppName("CondelCalc")
.setSparkHome(sys.env("SPARK_HOME"))
.setJars(SparkContext.jarOfObject(this))
.set("spark.executor.memory", "256m")
val spark = new SparkContext(sparkConf)
// Create the input MongoDb RDD
val mongoConf = new Configuration()
mongoConf.set("mongo.input.uri", "mongodb://node1.cluster.local:27017/fanns05.test")
mongoConf.set("mongo.output.uri", "mongodb://node1.cluster.local:27017/fanns05.test")
val input = spark.newAPIHadoopRDD(mongoConf,
classOf[com.mongodb.hadoop.MongoInputFormat],
classOf[Object], classOf[BSONObject])
// Load and broadcast the predictors metrics
val jsonMetrics = parse(new FileReader(args(0))) \ "metrics"
val metrics = predictors.map { pred =>
val pm = jsonMetrics \ pred
val rmin = (pm \ "rmin").extract[Double]
val rmax = (pm \ "rmax").extract[Double]
val ncd = (pm \ "neg_cumdist").extract[Array[Double]]
val pcd = (pm \ "pos_cumdist").extract[Array[Double]]
val cutoff = (pm \ "best_perf_cutoff" \ "MCC").extract[Double]
val index = (pm \ "best_perf_index" \ "MCC").extract[Int]
(pred -> new PredictorMetrics(rmin, rmax, ncd, pcd, cutoff, index))
}.toMap
val bcMetrics = spark.broadcast(metrics)
// Calculate Condel scores and update the database
val output = input.flatMap { case (id, d) =>
// Take the available scores from the input document
val s = d.get("s").asInstanceOf[BSONObject]
val scores: Map[String, Double] = predictors
.filter(pred => s.containsKey(pred))
.map(pred => (pred -> s.get(pred).asInstanceOf[Double]))
.toMap
// Calculate sum of weighted scores and sum of weights for each predictor
val (sumOfWeightedScores, sumOfWeights) = scores.map { case (pred, value) =>
// SIFT and FATHMM have inverted values
val score = pred match {
case "SIFT" => 1.0 - value
case "FATHMM" => -value
case _ => value
}
// retrieve the predictor metrics
val pm = bcMetrics.value(pred)
// calculate the weight
val normalizedScore = (score - pm.rmin) / (pm.rmax - pm.rmin)
val weight = if (score < pm.cutoff)
pm.ncd(pm.index) else pm.pcd(pm.index)
(normalizedScore, weight)
}
.foldLeft((0.0, 0.0)) { case ((s1, w1), (s2, w2)) =>
(w1 * s1 + w2 * s2, w1 + w2)
}
// if no scores are available for the SNV sumOfWeights will be 0.0
if (sumOfWeights != 0.0) {
val condel = sumOfWeightedScores / sumOfWeights
/*val o = new BasicBSONObject(
("g" -> d.get("g")), ("p" -> d.get("p")),
("s" -> scores + ("CONDEL" -> condel)))*/
s.put("CONDEL", condel)
d.put("s", s)
Some( (id, d) )
}
else
None
}
output.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any],
classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], mongoConf)
// Stop the Spark context
spark.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment