Skip to content

Instantly share code, notes, and snippets.

@jdeloach
Created February 2, 2016 22:45
Show Gist options
  • Save jdeloach/a9315159c1907655d89c to your computer and use it in GitHub Desktop.
Save jdeloach/a9315159c1907655d89c to your computer and use it in GitHub Desktop.
package experiments.old
import org.apache.spark.mllib.classification.NaiveBayesModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Vectors
import scala.collection.mutable.ArrayBuffer
trait ClassifierEnsemble {
def predict(features: Vector) : Double
def predictProbabilities(features: Vector) : Vector
def averageAuPRC(training: RDD[LabeledPoint]) : Double
}
class NaiveBayesBinaryVoting(models: List[NaiveBayesModel]) extends ClassifierEnsemble with Serializable {
def predict(features: Vector) : Double = {
models.groupBy(_.predict(features)).maxBy(_._2.length)._1
}
def predictProbabilities(features: Vector) : Vector = {
models.map { x => x.predictProbabilities(features) }.reduce((v1,v2) => addVectors(v1,v2))
}
/**
* Takes the auPRC of the ensemble classifier where each subclassifier sums its certainty towards each label
* This is different from voting as these votes are weighted with regards to certainty.
*/
def expertAuPRC(test: RDD[LabeledPoint]) : Double = {
val predictionAndLabel = test.map { point => {
val pred = predictProbabilities(point.features)
if(pred(0) > pred(1)) (0.0,point.label) else (1.0,point.label)
}}
val metrics = new BinaryClassificationMetrics(predictionAndLabel)
metrics.areaUnderPR()
}
/**
* Computes auPRC for each classifier, and then averages the auPRCs
*/
def averageAuPRC(test: RDD[LabeledPoint]) : Double = {
val auPRCs = models.map { model => {
// Evaluate model on test instances and compute test error
val predictionAndLabel = test.map { point =>
val prediction = model.predict(point.features)
(prediction, point.label)
}
val metrics = new BinaryClassificationMetrics(predictionAndLabel)
metrics.areaUnderPR()
}}
auPRCs.sum / auPRCs.length
}
/**
* Takes the auPRC of the ensemble classifier created with voting of subclassifiers
*/
def averageVotedAuPRC(test: RDD[LabeledPoint]) : Double = {
val predictionAndLabel = test.map { point =>
val prediction = predict(point.features)
(prediction,point.label)
}
val metrics = new BinaryClassificationMetrics(predictionAndLabel)
metrics.areaUnderPR()
}
protected def addVectors(v1: Vector, v2: Vector) : Vector = {
val a1 = v1.toArray; val a2 = v2.toArray;
val b = ArrayBuffer[Double]()
assert(a1.length == 2 && a2.length == 2)
for(i <- 0 to a1.length-1) {
b += a1(i) + a2(i)
}
Vectors.dense(b.toArray)
}
}
class NBClassifierUniqueFeaturesEnsemble(models: List[(NaiveBayesModel,Array[Int])]) extends NaiveBayesBinaryVoting(null) with Serializable {
override def predict(features: Vector) : Double = {
models.groupBy{ case (model,mFeats) => model.predict(modelSpecificFeatureVector(mFeats,features))}.maxBy(_._2.length)._1
}
override def predictProbabilities(features: Vector) : Vector = {
models.map { case (model,mFeats) => model.predictProbabilities(modelSpecificFeatureVector(mFeats,features)) }.reduce((v1,v2) => addVectors(v1,v2))
}
/**
* Computes auPRC for each classifier, and then averages the auPRCs
*/
override def averageAuPRC(test: RDD[LabeledPoint]) : Double = {
val auPRCs = models.map { case (model,mFeats) => {
// Evaluate model on test instances and compute test error
val predictionAndLabel = test.map { point =>
val prediction = model.predict(modelSpecificFeatureVector(mFeats,point.features))
(prediction, point.label)
}
val metrics = new BinaryClassificationMetrics(predictionAndLabel)
metrics.areaUnderPR()
}}
auPRCs.sum / auPRCs.length
}
/** Returns the subset of features necessary for the provided model feature specification */
private def modelSpecificFeatureVector(modelFeatures: Array[Int], baseFeatureVector: Vector) : Vector = Vectors.sparse(baseFeatureVector.size, modelFeatures, modelFeatures.map { y => baseFeatureVector(y) }.toArray)
}
package experiments.old
import java.io.File
import java.io.PrintWriter
import scala.annotation.elidable
import scala.annotation.elidable.ASSERTION
import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.feature.InfoThCriterionFactory
import org.apache.spark.mllib.feature.InfoThSelector
import org.apache.spark.mllib.feature.InfoThSelector2
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import java.io.FileOutputStream
object InfoGain {
val f = new File("ml_diagnostics.txt")
def main(args : Array[String]) {
val conf = new SparkConf()
.setAppName("Info Gain")
//.setMaster("spark://129.130.10.134:7077")
//.setMaster("local[11]")
.set("spark.executor.memory", "60g")
.set("spark.driver.maxResultSize", "35g")
val sc = new SparkContext(conf)
val criterion = new InfoThCriterionFactory("mim")
val nPartitions = 100
val baseData = MLUtils.loadLibSVMFile(sc, "rq2_binaryClass.libsvm", 471, nPartitions).cache ///Users/jdeloach/Code/Weka/weka-3-7-12/ //binaryClass
val nToSelect = 100
println("*** FS criterion: " + criterion.getCriterion.toString)
println("*** Number of features to select: " + nToSelect)
println("*** Number of partitions: " + nPartitions)
val splits = baseData.randomSplit(Array(0.67, 0.33))
val sets = generateSets(sc, splits(0))
val listOfTopFeatures = sets.par.map { data => {
InfoThSelector2.train(criterion,
data, // RDD[LabeledPoint]
nToSelect, // number of features to select
nPartitions) // number of partitions
// List[(Int,Double)]
}}.toList
// legacy code
val justFeatures = listOfTopFeatures.map(list => list.map(_._1).toArray)
// per classifier top 100
val perClassifierTop100Features = listOfTopFeatures.map(list => list.sortBy(x => Math.abs(x._2)).reverse.take(100).map(_._1).toArray)
// overall average top 100 features
val top100CommonFeats = listOfTopFeatures.flatten.groupBy{ x => x._1 }.map{case (idx,list) => (idx,list.map(x => Math.abs(x._2)).sum)}.toList.sortBy(f => f._2).reverse.take(100).map(_._1)
val random100Feats = Random.shuffle(justFeatures.flatten.distinct.toList).take(100)
val base100Feats = InfoThSelector.train(criterion, baseData, nToSelect, nPartitions).selectedFeatures
diagnostics("listOfTopFeatures: " + listOfTopFeatures.flatten.groupBy{ x => x._1 }.map{case (idx,list) => (idx,list.map(x => Math.abs(x._2)).sum)}.toList.sortBy(f => f._2).reverse.take(100))
//diagnostics("top100CommonFeats: " + top100CommonFeats)
diagnostics("base100Feats: " + List.fromArray(base100Feats))
diagnostics("Size of sets (should be size of neg*2): " + sets.map { x => x.count() }.toList)
diagnostics("Subset Features: V0" + splits(1).take(1).map { x => x.features.toJson }.toList + ", V1:" + subsetOfFeatures(splits(1), top100CommonFeats).take(1).map { x => x.features.toJson }.toList + ", Selected Features:" + top100CommonFeats)
val r1 = testNNaiveBayes(subsetOfFeatures(sets, top100CommonFeats), subsetOfFeatures(splits(1), top100CommonFeats), "Ensemble 100 Top")
val r2 = testNaiveBayes(subsetOfFeatures(splits(0), List.fromArray(base100Feats)), subsetOfFeatures(splits(1), List.fromArray(base100Feats)), "Single Classifier")
val r3 = testNNaiveBayes(sets, splits(1), "Ensemble 471 Feats")
val r4 = testNNaiveBayes(subsetOfFeatures(sets, random100Feats), subsetOfFeatures(splits(1), random100Feats), "Ensemble Random 100 Feats")
val r5 = testFeatureSpecificNNaiveBayes(subsetOfFeaturesPerClassifier(sets, perClassifierTop100Features), splits(1), perClassifierTop100Features, "Per Classifier Best 100 Features")
diagnostics(r1 + "\n" + r3 + "\n" + r4 + "\n" + r2 + "\n" + r5)
println(r1 + "\n" + r3 + "\n" + r4 + "\n" + r2 + "\n" + r5)
}
/**
* Generates N different datasets with a 1:1 balance, assuming N times as many 0-class as 1-class examples.
*/
def generateSets(sc: SparkContext, baseData: RDD[LabeledPoint]) : List[RDD[LabeledPoint]] = {
val negativeSet = baseData.filter { x => x.label == 1 /*&& x.features.numNonzeros > 0*/ }.collect()
val positiveSet = List.fromArray(baseData.filter { x => x.label == 0 }.collect())
val numClassifiers = (Math.floor(positiveSet.length / negativeSet.length)).toInt
val positiveSets = positiveSet.grouped(negativeSet.length).toList
positiveSets.map { x => sc.parallelize(x.union(negativeSet)).cache() }
}
/**
* Takes in a list of datasets, and a list of what features each of those datasets should be subsetted down to. Returns each dataset, with the correct per-dataset features.
*/
def subsetOfFeaturesPerClassifier(baseData: List[RDD[LabeledPoint]], featureIdxsPerClassifier: List[Array[Int]]) : List[RDD[LabeledPoint]] = {
assert(baseData.length == featureIdxsPerClassifier.length) // must have the same amount of datasets in each
baseData.zipWithIndex.map{ case (rdd,idx) => subsetOfFeatures(rdd, List.fromArray(featureIdxsPerClassifier(idx))) }
}
/**
* Returns datasets that only use the provided subset of features for the different datasets provided.
*/
def subsetOfFeatures(baseData: List[RDD[LabeledPoint]], featureIdxs: List[Int]) : List[RDD[LabeledPoint]] = {
baseData.map{ rdd => subsetOfFeatures(rdd, featureIdxs) }
}
/**
* Reduces the feature space from baseData to just those provided in featureIdxs.
*/
def subsetOfFeatures(baseData: RDD[LabeledPoint], featureIdxs: List[Int]) : RDD[LabeledPoint] = {
baseData.map { x => new LabeledPoint(x.label,Vectors.sparse(x.features.size, featureIdxs.toArray, featureIdxs.map { y => x.features(y) }.toArray)) }
}
/**
* Prints out various auPRC metrics for a given SINGLE Naive Bayes instances.
*/
def testNaiveBayes(train: RDD[LabeledPoint], test: RDD[LabeledPoint], testName: String) : String = {
val model = NaiveBayes.train(train, lambda = 1.0)
val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))
//val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()
val metrics = new BinaryClassificationMetrics(predictionAndLabel)
"For test: " + testName + ", we got auROC: " + metrics.areaUnderROC() + " and auPRC: " + metrics.areaUnderPR()
}
/**
* Uses N classifiers for a largely unbalanced dataset. N = #larger set/#smaller set. Prints out auRPC metrics.
*/
def testNNaiveBayes(train: List[RDD[LabeledPoint]], test: RDD[LabeledPoint], testName: String) : String = {
val classifiers = train.map { x => NaiveBayes.train(x) }
val ensemble = new NaiveBayesBinaryVoting(classifiers)
val predictionAndLabel = test.map(p => (ensemble.predict(p.features), p.label))
val metrics = new BinaryClassificationMetrics(predictionAndLabel)
"For test: " + testName + ", we got auROC: " + metrics.areaUnderROC() + " and auPRC: " + metrics.areaUnderPR() + ", expertAuPRC: " +
ensemble.expertAuPRC(test) //+ ", averageVotedAuPRC: " + ensemble.averageVotedAuPRC(test) + ", averageAuPRC: " + ensemble.averageAuPRC(test)
}
def testFeatureSpecificNNaiveBayes(train: List[RDD[LabeledPoint]], test: RDD[LabeledPoint], classifierFeatures: List[Array[Int]], testName: String) : String = {
val classifiers = train.map { x => NaiveBayes.train(x) }
val ensemble = new NBClassifierUniqueFeaturesEnsemble(classifiers.zip(classifierFeatures))
val predictionAndLabel = test.map(p => (ensemble.predict(p.features), p.label))
val metrics = new BinaryClassificationMetrics(predictionAndLabel)
"For test: " + testName + ", we got auROC: " + metrics.areaUnderROC() + " and auPRC: " + metrics.areaUnderPR() + ", expertAuPRC: " +
ensemble.expertAuPRC(test) //+ ", averageVotedAuPRC: " + ensemble.averageVotedAuPRC(test) + ", averageAuPRC: " + ensemble.averageAuPRC(test)
}
def diagnostics(m: String) : Unit = {
val pw = new PrintWriter(new FileOutputStream(f, true))
pw.append(m + "\n")
pw.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment