Skip to content

Instantly share code, notes, and snippets.

@eavidan
Last active March 5, 2018 10:16
Show Gist options
  • Save eavidan/4f5fd5818da07e846e2cfc30dac37b28 to your computer and use it in GitHub Desktop.
Save eavidan/4f5fd5818da07e846e2cfc30dac37b28 to your computer and use it in GitHub Desktop.
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature._
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
/**
* the following creates a model that will
* predict the binary response variable using Random Forest classifier using spark-ml
*/
object BigPanda {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("BigPanda").getOrCreate()
val data = spark.read.option("header", "true").option("inferSchema", "true").csv("data.csv")
// Drop features with more than 70% empty values
val validColumns: Array[String] = getColumnNamesAccordingToMissingValues(data, 0.7)
val relevantCols: DataFrame = data.select(validColumns.head, validColumns.tail: _*)
// Impute missing values, for valid features. Had issues with getting spark 2.2 so did not use Imputer transformer
val clean: DataFrame = replaceMissingValuesWithAverage(relevantCols, validColumns)
clean.show(10)
// Apply TF-IDF feature extractor on feature_14
val rescaledData: DataFrame = addTfIdfFeature(clean, "feature_14", "feature_14_features")
// Apply OneHotEncoder transformer on categorical features
val catCols: Array[String] = data.dtypes.filter(_._2 == "StringType").map(_._1)
val encoded: DataFrame = addOnHotEncouding(rescaledData, catCols)
val catFeatures: Array[String] = catCols.map(c => s"${c}_hot")
val numFeatures: Array[String] = validColumns.filter(!catCols.contains(_))
val features: Array[String] = catFeatures ++ numFeatures :+ "feature_14_features"
// Assemble a feature vector for the random forest
val assembler = new VectorAssembler()
.setInputCols(features)
.setOutputCol("features")
val ready = assembler.transform(encoded)
ready.show(10)
// Randomly spilt the data into training set (95%) and test set (5%)
val Array(training, test) = ready.randomSplit(Array(0.95, 0.05), seed = 12345)
val labelIndexer = new StringIndexer()
.setInputCol("response")
.setOutputCol("label")
.fit(ready)
val rf = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, rf, labelConverter))
// Apply CrossValidator in order to find the best model
val paramGrid = new ParamGridBuilder()
.addGrid(rf.maxDepth, Array(4, 6, 8)) // randomForest.maxDepth - Array(4, 6, 8)
.addGrid(rf.numTrees, Array(10, 30, 50)) // randomForest.numTrees - Array(10, 30, 50)
.build()
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator) // set setEvaluator to BinaryClassificationEvaluator
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3) // set numFolds to 3
val cvModel = cv.fit(training)
val predictions = cvModel.transform(test)
// Lets see some example predictions on the test set
predictions.select("predictedLabel", "label", "features").show(10)
// Now we evaluate our (best) model on the test set
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))
}
def addTfIdfFeature(clean: DataFrame, inputCol: String, outputCol: String): DataFrame = {
val tokenizer = new Tokenizer().setInputCol(inputCol).setOutputCol(s"${inputCol}_words")
val wordsData = tokenizer.transform(clean)
val hashingTF = new HashingTF()
.setInputCol(s"${inputCol}_words").setOutputCol(s"${inputCol}_rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
val idf = new IDF().setInputCol(s"${inputCol}_rawFeatures").setOutputCol(s"${inputCol}_features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData
}
def getColumnNamesAccordingToMissingValues(data: DataFrame, threshold: Double): Array[String] = {
val rows = data.count()
val expr: Array[Column] = data.columns.map(c => (sum(col(c).isNull.cast("double")) / rows < 0.7).alias(c))
val missingValuesCols: Row = data.select(expr: _*).first()
val validColumns: Array[String] = data.columns.filter((c: String) => missingValuesCols
.getValuesMap(missingValuesCols.schema.fieldNames)(c).asInstanceOf[Boolean])
validColumns
}
def replaceMissingValuesWithAverage(df: DataFrame, columns: Array[String]) = {
val avgExpr: Array[Column] = columns.map(c => mean(c).alias(c))
val colAvgs: Row = df.na.drop(columns).agg(avgExpr.head, avgExpr.tail: _*).first()
val numericalColsAvgMap: Map[String, Any] = colAvgs.getValuesMap[Any](colAvgs.schema.fieldNames).filter(_._2 != null)
val clean: DataFrame = df.na.fill(numericalColsAvgMap)
clean
}
def addOnHotEncouding(df: DataFrame, catCols: Array[String]): DataFrame = {
val indexers= catCols
.map(c =>
new StringIndexer()
.setInputCol(c)
.setOutputCol(s"${c}_ix")
)
val onHots = catCols
.map(c =>
new OneHotEncoder()
.setInputCol(s"${c}_ix")
.setOutputCol(s"${c}_hot")
)
val indexerPipeline = new Pipeline()
.setStages(indexers ++ onHots)
val encoded = indexerPipeline.fit(df).transform(df)
encoded
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment