Created
February 2, 2016 22:45
-
-
Save jdeloach/a9315159c1907655d89c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package experiments.old | |
import org.apache.spark.mllib.classification.NaiveBayesModel | |
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics | |
import org.apache.spark.mllib.linalg.Vector | |
import org.apache.spark.mllib.regression.LabeledPoint | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.mllib.linalg.Vectors | |
import scala.collection.mutable.ArrayBuffer | |
trait ClassifierEnsemble { | |
def predict(features: Vector) : Double | |
def predictProbabilities(features: Vector) : Vector | |
def averageAuPRC(training: RDD[LabeledPoint]) : Double | |
} | |
class NaiveBayesBinaryVoting(models: List[NaiveBayesModel]) extends ClassifierEnsemble with Serializable { | |
def predict(features: Vector) : Double = { | |
models.groupBy(_.predict(features)).maxBy(_._2.length)._1 | |
} | |
def predictProbabilities(features: Vector) : Vector = { | |
models.map { x => x.predictProbabilities(features) }.reduce((v1,v2) => addVectors(v1,v2)) | |
} | |
/** | |
* Takes the auPRC of the ensemble classifier where each subclassifier sums its certainty towards each label | |
* This is different from voting as these votes are weighted with regards to certainty. | |
*/ | |
def expertAuPRC(test: RDD[LabeledPoint]) : Double = { | |
val predictionAndLabel = test.map { point => { | |
val pred = predictProbabilities(point.features) | |
if(pred(0) > pred(1)) (0.0,point.label) else (1.0,point.label) | |
}} | |
val metrics = new BinaryClassificationMetrics(predictionAndLabel) | |
metrics.areaUnderPR() | |
} | |
/** | |
* Computes auPRC for each classifier, and then averages the auPRCs | |
*/ | |
def averageAuPRC(test: RDD[LabeledPoint]) : Double = { | |
val auPRCs = models.map { model => { | |
// Evaluate model on test instances and compute test error | |
val predictionAndLabel = test.map { point => | |
val prediction = model.predict(point.features) | |
(prediction, point.label) | |
} | |
val metrics = new BinaryClassificationMetrics(predictionAndLabel) | |
metrics.areaUnderPR() | |
}} | |
auPRCs.sum / auPRCs.length | |
} | |
/** | |
* Takes the auPRC of the ensemble classifier created with voting of subclassifiers | |
*/ | |
def averageVotedAuPRC(test: RDD[LabeledPoint]) : Double = { | |
val predictionAndLabel = test.map { point => | |
val prediction = predict(point.features) | |
(prediction,point.label) | |
} | |
val metrics = new BinaryClassificationMetrics(predictionAndLabel) | |
metrics.areaUnderPR() | |
} | |
protected def addVectors(v1: Vector, v2: Vector) : Vector = { | |
val a1 = v1.toArray; val a2 = v2.toArray; | |
val b = ArrayBuffer[Double]() | |
assert(a1.length == 2 && a2.length == 2) | |
for(i <- 0 to a1.length-1) { | |
b += a1(i) + a2(i) | |
} | |
Vectors.dense(b.toArray) | |
} | |
} | |
class NBClassifierUniqueFeaturesEnsemble(models: List[(NaiveBayesModel,Array[Int])]) extends NaiveBayesBinaryVoting(null) with Serializable { | |
override def predict(features: Vector) : Double = { | |
models.groupBy{ case (model,mFeats) => model.predict(modelSpecificFeatureVector(mFeats,features))}.maxBy(_._2.length)._1 | |
} | |
override def predictProbabilities(features: Vector) : Vector = { | |
models.map { case (model,mFeats) => model.predictProbabilities(modelSpecificFeatureVector(mFeats,features)) }.reduce((v1,v2) => addVectors(v1,v2)) | |
} | |
/** | |
* Computes auPRC for each classifier, and then averages the auPRCs | |
*/ | |
override def averageAuPRC(test: RDD[LabeledPoint]) : Double = { | |
val auPRCs = models.map { case (model,mFeats) => { | |
// Evaluate model on test instances and compute test error | |
val predictionAndLabel = test.map { point => | |
val prediction = model.predict(modelSpecificFeatureVector(mFeats,point.features)) | |
(prediction, point.label) | |
} | |
val metrics = new BinaryClassificationMetrics(predictionAndLabel) | |
metrics.areaUnderPR() | |
}} | |
auPRCs.sum / auPRCs.length | |
} | |
/** Returns the subset of features necessary for the provided model feature specification */ | |
private def modelSpecificFeatureVector(modelFeatures: Array[Int], baseFeatureVector: Vector) : Vector = Vectors.sparse(baseFeatureVector.size, modelFeatures, modelFeatures.map { y => baseFeatureVector(y) }.toArray) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package experiments.old | |
import java.io.File | |
import java.io.PrintWriter | |
import scala.annotation.elidable | |
import scala.annotation.elidable.ASSERTION | |
import scala.util.Random | |
import org.apache.spark.SparkConf | |
import org.apache.spark.SparkContext | |
import org.apache.spark.mllib.classification.NaiveBayes | |
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics | |
import org.apache.spark.mllib.feature.InfoThCriterionFactory | |
import org.apache.spark.mllib.feature.InfoThSelector | |
import org.apache.spark.mllib.feature.InfoThSelector2 | |
import org.apache.spark.mllib.linalg.Vectors | |
import org.apache.spark.mllib.regression.LabeledPoint | |
import org.apache.spark.mllib.util.MLUtils | |
import org.apache.spark.rdd.RDD | |
import java.io.FileOutputStream | |
object InfoGain { | |
val f = new File("ml_diagnostics.txt") | |
def main(args : Array[String]) { | |
val conf = new SparkConf() | |
.setAppName("Info Gain") | |
//.setMaster("spark://129.130.10.134:7077") | |
//.setMaster("local[11]") | |
.set("spark.executor.memory", "60g") | |
.set("spark.driver.maxResultSize", "35g") | |
val sc = new SparkContext(conf) | |
val criterion = new InfoThCriterionFactory("mim") | |
val nPartitions = 100 | |
val baseData = MLUtils.loadLibSVMFile(sc, "rq2_binaryClass.libsvm", 471, nPartitions).cache ///Users/jdeloach/Code/Weka/weka-3-7-12/ //binaryClass | |
val nToSelect = 100 | |
println("*** FS criterion: " + criterion.getCriterion.toString) | |
println("*** Number of features to select: " + nToSelect) | |
println("*** Number of partitions: " + nPartitions) | |
val splits = baseData.randomSplit(Array(0.67, 0.33)) | |
val sets = generateSets(sc, splits(0)) | |
val listOfTopFeatures = sets.par.map { data => { | |
InfoThSelector2.train(criterion, | |
data, // RDD[LabeledPoint] | |
nToSelect, // number of features to select | |
nPartitions) // number of partitions | |
// List[(Int,Double)] | |
}}.toList | |
// legacy code | |
val justFeatures = listOfTopFeatures.map(list => list.map(_._1).toArray) | |
// per classifier top 100 | |
val perClassifierTop100Features = listOfTopFeatures.map(list => list.sortBy(x => Math.abs(x._2)).reverse.take(100).map(_._1).toArray) | |
// overall average top 100 features | |
val top100CommonFeats = listOfTopFeatures.flatten.groupBy{ x => x._1 }.map{case (idx,list) => (idx,list.map(x => Math.abs(x._2)).sum)}.toList.sortBy(f => f._2).reverse.take(100).map(_._1) | |
val random100Feats = Random.shuffle(justFeatures.flatten.distinct.toList).take(100) | |
val base100Feats = InfoThSelector.train(criterion, baseData, nToSelect, nPartitions).selectedFeatures | |
diagnostics("listOfTopFeatures: " + listOfTopFeatures.flatten.groupBy{ x => x._1 }.map{case (idx,list) => (idx,list.map(x => Math.abs(x._2)).sum)}.toList.sortBy(f => f._2).reverse.take(100)) | |
//diagnostics("top100CommonFeats: " + top100CommonFeats) | |
diagnostics("base100Feats: " + List.fromArray(base100Feats)) | |
diagnostics("Size of sets (should be size of neg*2): " + sets.map { x => x.count() }.toList) | |
diagnostics("Subset Features: V0" + splits(1).take(1).map { x => x.features.toJson }.toList + ", V1:" + subsetOfFeatures(splits(1), top100CommonFeats).take(1).map { x => x.features.toJson }.toList + ", Selected Features:" + top100CommonFeats) | |
val r1 = testNNaiveBayes(subsetOfFeatures(sets, top100CommonFeats), subsetOfFeatures(splits(1), top100CommonFeats), "Ensemble 100 Top") | |
val r2 = testNaiveBayes(subsetOfFeatures(splits(0), List.fromArray(base100Feats)), subsetOfFeatures(splits(1), List.fromArray(base100Feats)), "Single Classifier") | |
val r3 = testNNaiveBayes(sets, splits(1), "Ensemble 471 Feats") | |
val r4 = testNNaiveBayes(subsetOfFeatures(sets, random100Feats), subsetOfFeatures(splits(1), random100Feats), "Ensemble Random 100 Feats") | |
val r5 = testFeatureSpecificNNaiveBayes(subsetOfFeaturesPerClassifier(sets, perClassifierTop100Features), splits(1), perClassifierTop100Features, "Per Classifier Best 100 Features") | |
diagnostics(r1 + "\n" + r3 + "\n" + r4 + "\n" + r2 + "\n" + r5) | |
println(r1 + "\n" + r3 + "\n" + r4 + "\n" + r2 + "\n" + r5) | |
} | |
/** | |
* Generates N different datasets with a 1:1 balance, assuming N times as many 0-class as 1-class examples. | |
*/ | |
def generateSets(sc: SparkContext, baseData: RDD[LabeledPoint]) : List[RDD[LabeledPoint]] = { | |
val negativeSet = baseData.filter { x => x.label == 1 /*&& x.features.numNonzeros > 0*/ }.collect() | |
val positiveSet = List.fromArray(baseData.filter { x => x.label == 0 }.collect()) | |
val numClassifiers = (Math.floor(positiveSet.length / negativeSet.length)).toInt | |
val positiveSets = positiveSet.grouped(negativeSet.length).toList | |
positiveSets.map { x => sc.parallelize(x.union(negativeSet)).cache() } | |
} | |
/** | |
* Takes in a list of datasets, and a list of what features each of those datasets should be subsetted down to. Returns each dataset, with the correct per-dataset features. | |
*/ | |
def subsetOfFeaturesPerClassifier(baseData: List[RDD[LabeledPoint]], featureIdxsPerClassifier: List[Array[Int]]) : List[RDD[LabeledPoint]] = { | |
assert(baseData.length == featureIdxsPerClassifier.length) // must have the same amount of datasets in each | |
baseData.zipWithIndex.map{ case (rdd,idx) => subsetOfFeatures(rdd, List.fromArray(featureIdxsPerClassifier(idx))) } | |
} | |
/** | |
* Returns datasets that only use the provided subset of features for the different datasets provided. | |
*/ | |
def subsetOfFeatures(baseData: List[RDD[LabeledPoint]], featureIdxs: List[Int]) : List[RDD[LabeledPoint]] = { | |
baseData.map{ rdd => subsetOfFeatures(rdd, featureIdxs) } | |
} | |
/** | |
* Reduces the feature space from baseData to just those provided in featureIdxs. | |
*/ | |
def subsetOfFeatures(baseData: RDD[LabeledPoint], featureIdxs: List[Int]) : RDD[LabeledPoint] = { | |
baseData.map { x => new LabeledPoint(x.label,Vectors.sparse(x.features.size, featureIdxs.toArray, featureIdxs.map { y => x.features(y) }.toArray)) } | |
} | |
/** | |
* Prints out various auPRC metrics for a given SINGLE Naive Bayes instances. | |
*/ | |
def testNaiveBayes(train: RDD[LabeledPoint], test: RDD[LabeledPoint], testName: String) : String = { | |
val model = NaiveBayes.train(train, lambda = 1.0) | |
val predictionAndLabel = test.map(p => (model.predict(p.features), p.label)) | |
//val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count() | |
val metrics = new BinaryClassificationMetrics(predictionAndLabel) | |
"For test: " + testName + ", we got auROC: " + metrics.areaUnderROC() + " and auPRC: " + metrics.areaUnderPR() | |
} | |
/** | |
* Uses N classifiers for a largely unbalanced dataset. N = #larger set/#smaller set. Prints out auRPC metrics. | |
*/ | |
def testNNaiveBayes(train: List[RDD[LabeledPoint]], test: RDD[LabeledPoint], testName: String) : String = { | |
val classifiers = train.map { x => NaiveBayes.train(x) } | |
val ensemble = new NaiveBayesBinaryVoting(classifiers) | |
val predictionAndLabel = test.map(p => (ensemble.predict(p.features), p.label)) | |
val metrics = new BinaryClassificationMetrics(predictionAndLabel) | |
"For test: " + testName + ", we got auROC: " + metrics.areaUnderROC() + " and auPRC: " + metrics.areaUnderPR() + ", expertAuPRC: " + | |
ensemble.expertAuPRC(test) //+ ", averageVotedAuPRC: " + ensemble.averageVotedAuPRC(test) + ", averageAuPRC: " + ensemble.averageAuPRC(test) | |
} | |
def testFeatureSpecificNNaiveBayes(train: List[RDD[LabeledPoint]], test: RDD[LabeledPoint], classifierFeatures: List[Array[Int]], testName: String) : String = { | |
val classifiers = train.map { x => NaiveBayes.train(x) } | |
val ensemble = new NBClassifierUniqueFeaturesEnsemble(classifiers.zip(classifierFeatures)) | |
val predictionAndLabel = test.map(p => (ensemble.predict(p.features), p.label)) | |
val metrics = new BinaryClassificationMetrics(predictionAndLabel) | |
"For test: " + testName + ", we got auROC: " + metrics.areaUnderROC() + " and auPRC: " + metrics.areaUnderPR() + ", expertAuPRC: " + | |
ensemble.expertAuPRC(test) //+ ", averageVotedAuPRC: " + ensemble.averageVotedAuPRC(test) + ", averageAuPRC: " + ensemble.averageAuPRC(test) | |
} | |
def diagnostics(m: String) : Unit = { | |
val pw = new PrintWriter(new FileOutputStream(f, true)) | |
pw.append(m + "\n") | |
pw.close() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment