Skip to content

Instantly share code, notes, and snippets.

@szilard
Forked from jkbradley/benchm-ml-spark
Last active September 9, 2015 16:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save szilard/3a9e2a1e31fad7191243 to your computer and use it in GitHub Desktop.
Save szilard/3a9e2a1e31fad7191243 to your computer and use it in GitHub Desktop.
Running benchm-ml benchmark for random forest on Spark, using soft predictions to get better AUC
Here are 2 code snippets:
(1) Compute one-hot encoded data for Spark, using the data generated by https://github.com/szilard/benchm-ml/blob/master/0-init/2-gendata.txt
(2) Run MLlib, computing soft predictions by hand.
I ran these with Spark 1.4, and they should work for 1.5 as well.
Note: There's no real need to switch to DataFrames yet for benchmarking. Both the RDD and DataFrame APIs use the same underlying implementation. (I hope to improve on that in Spark 1.6 if there is time.)
Ran on EC2 cluster with 4 workers with 9.6GB memory each, and 8 partitions for training RDD.
For the 1M dataset, training the forest took 2080.814977193 sec and achieved AUC 0.7129779357732448 on the test set.
(1) Code for one-hot encoding
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.linalg.Vector
// Paths
val origDataDir = "/mnt/mllib/regression/flightTimes/prepped"
val origTrainPath = origDataDir + "/train-10m.csv"
val origTestPath = origDataDir + "/test.csv"
val newDataDir = "/mnt/mllib/regression/flightTimes/spark"
val newTrainPath = newDataDir + "/spark-train-10m.FIXED.parquet"
val newTestPath = newDataDir + "/spark-test.FIXED.parquet"
// Read CSV as Spark DataFrames
val trainDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(origTrainPath)
val testDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(origTestPath)
// Combine train, test temporarily
val fullDF = trainDF.withColumn("isTrain", lit(true)).unionAll(testDF.withColumn("isTrain", lit(false)))
display(fullDF)
// Feature types
val vars_categ = Array("Month","DayofMonth","DayOfWeek","UniqueCarrier", "Origin", "Dest")
val vars_num = Array("DepTime","Distance")
val vars_num_double = vars_num.map(_ + "_double")
val var_y = "dep_delayed_15min"
// Cast column types as needed
val fullDF2 = fullDF.withColumn("DepTime_double", col("DepTime").cast(DoubleType)).withColumn("Distance_double", col("Distance").cast(DoubleType))
display(fullDF2)
// Assemble Pipeline for featurization.
// Need to use StringIndexer for OneHotEncoder since it does not yet support String input (but it will).
val stringIndexers = vars_categ.map(colName => new StringIndexer().setInputCol(colName).setOutputCol(colName + "_indexed"))
val oneHotEncoders = vars_categ.map(colName => new OneHotEncoder().setInputCol(colName + "_indexed").setOutputCol(colName + "_ohe").setDropLast(false))
val catAssembler = new VectorAssembler().setInputCols(vars_categ.map(_ + "_ohe")).setOutputCol("catFeatures")
val featureAssembler = new VectorAssembler().setInputCols(vars_num_double :+ "catFeatures").setOutputCol("features")
val labelIndexer = new StringIndexer().setInputCol(var_y).setOutputCol("label")
val pipeline = new Pipeline().setStages(stringIndexers ++ oneHotEncoders ++ Array(catAssembler, featureAssembler, labelIndexer))
// Compute features.
val pipelineModel = pipeline.fit(fullDF2)
val transformedDF = pipelineModel.transform(fullDF2)
display(transformedDF)
// Split back into train, test
val finalTrainDF = transformedDF.where(col("isTrain"))
val finalTestDF = transformedDF.where(!col("isTrain"))
// Save Spark DataFrames as Parquet
finalTrainDF.write.mode("overwrite").parquet(newTrainPath)
finalTestDF.write.mode("overwrite").parquet(newTestPath)
(2) AUC/accuracy
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.configuration.Strategy
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.sql.{DataFrame, Row}
// Paths
val dataDir = "/mnt/mllib/regression/flightTimes/spark"
val trainDataPath = dataDir + "/spark-train-0.1m.FIXED.parquet"
val testDataPath = dataDir + "/spark-test.FIXED.parquet"
// Load DataFrame, and convert to RDD of LabeledPoints
def toLP(df: DataFrame): RDD[LabeledPoint] = {
df.select("label", "features").map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) }.repartition(8)
}
val train = toLP(sqlContext.read.parquet(trainDataPath)).cache()
val test = toLP(sqlContext.read.parquet(testDataPath)).cache()
(train.count(), test.count())
// Train model
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 500
val featureSubsetStrategy = "sqrt"
val impurity = "gini"
val maxDepth = 20
val maxBins = 50
val now = System.nanoTime
val model = RandomForest.trainClassifier(train, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
val elapsed = ( System.nanoTime - now )/1e9
elapsed
// Compute soft predictions. For spark.mllib trees, this works for binary classification.
// Spark 1.5 will include it for multiclass under the spark.ml API.
import org.apache.spark.mllib.tree.configuration.FeatureType.Continuous
import org.apache.spark.mllib.tree.model.{DecisionTreeModel, Node}
def softPredict(node: Node, features: Vector): Double = {
if (node.isLeaf) {
if (node.predict.predict == 1.0) node.predict.prob else 1.0 - node.predict.prob
} else {
if (node.split.get.featureType == Continuous) {
if (features(node.split.get.feature) <= node.split.get.threshold) {
softPredict(node.leftNode.get, features)
} else {
softPredict(node.rightNode.get, features)
}
} else {
if (node.split.get.categories.contains(features(node.split.get.feature))) {
softPredict(node.leftNode.get, features)
} else {
softPredict(node.rightNode.get, features)
}
}
}
}
def softPredict(dt: DecisionTreeModel, features: Vector): Double = {
softPredict(dt.topNode, features)
}
// Compute AUC
val scoreAndLabels = test.map { point =>
//val score = model.trees.map(_.predict(point.features)).filter(_>0).size.toDouble / model.numTrees
val score = model.trees.map(tree => softPredict(tree, point.features)).sum / model.numTrees
(score, point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
metrics.areaUnderROC()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment