Created
June 9, 2016 00:38
-
-
Save krishnanraman/1719265ebc3d065f3021b741dba34540 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.marin.dt | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.mllib.tree.DecisionTree | |
import org.apache.spark.mllib.tree.model.{DecisionTreeModel,Node} | |
import org.apache.spark.mllib.tree.configuration.Algo | |
import org.apache.spark.mllib.evaluation.RegressionMetrics | |
/* | |
Collection of routines to Prune a Decision Tree, & enable the following features | |
1. Traverse a decision tree | |
2. Obtain Node Stats (ie. parent node, left & right siblings etc.) about all nodes in a tree | |
3. Obtain the traversal path from the root node to any given node | |
*/ | |
// trait ClusterTree { def cluster(input:Dataset):Dataset } | |
object ClusterTree { | |
def regressionStats(model:DecisionTreeModel, metrics:RegressionMetrics):String = { | |
"Decision Tree Depth: %d RMSE: %.3f Explained Variance %.3f R^2 %.3f\n%s\n\n".format( | |
model.depth, | |
metrics.rootMeanSquaredError, | |
metrics.explainedVariance, | |
metrics.r2, | |
model.toDebugString) | |
} | |
def featureCount(model:DecisionTreeModel, featureNames:Map[Int, String]):List[(String,Int)] = { | |
val lines = model.toDebugString.split("\n") | |
val f = "feature" | |
lines.foldLeft(Map[String, Int]()) { (map, line) => | |
if (line.contains(f)) { | |
val startIdx = line.indexOf(f) + f.size | |
val endIdx = line.indexOf(" ", startIdx + 1) | |
val fid = line.slice(startIdx, endIdx).trim.toInt | |
val name = featureNames(fid) | |
val count = map.getOrElse(name, 0) + 1 | |
map ++ Map(name -> count) | |
} else map | |
} | |
.toList | |
.sortBy{ x => - x._2 } | |
} | |
// for every non-leaf, get a list of paths. A path is me->left->right, where left & right are obtained recursively | |
def nodePaths(model:DecisionTreeModel, featureNames:Map[Int, String]):List[String] = { | |
/* | |
WANT (nodeid, path to me from root) | |
(1,f_publisherGroupName:4150) | |
(2,f_publisherGroupName:4150 -> f_publisherGroupName:4150) | |
(3,f_publisherGroupName:4150 -> f_publisherGroupName:4150) | |
(4,f_publisherGroupName:4150 -> f_publisherGroupName:4150 -> f_publisherGroupName:4150) | |
*/ | |
val allstats:List[MyNodeStats] = nodeStats(traverse(model.topNode)) | |
val allpathsFromRoot:Map[Int, List[Node]] = pathFromRoot(allstats) | |
val nodemap: Map[Int, MyNodeStats] = nodeMap(allstats) | |
val fmap = traverse(model.topNode).flatMap{ node => | |
if (node.split.isDefined) { | |
val nodeid = node.id | |
val featureNum = node.split.get.feature | |
val featureName = featureNames(featureNum) | |
val featureStat = FeatureStats(featureNum, featureName, 1) | |
val mapping = (featureNum -> featureStat) | |
Some(mapping) | |
} else None | |
}.toMap | |
allstats.map{ stat => | |
val id = stat.me.id | |
val path = pathFromRoot(stat, nodemap, allpathsFromRoot(id)) | |
val pStr = path2String(path, fmap) | |
(id, pStr).toString | |
} | |
} | |
def nodeStats(model:DecisionTreeModel):List[MyNodeStats] = { | |
nodeStats(traverse(model.topNode)) | |
} | |
/** | |
A recursive routine to traverse the decision tree given the root, build a sorted list of nodes | |
*/ | |
def traverse(x:Node):List[Node] = { | |
val leftOpt = x.leftNode | |
val rightOpt = x.rightNode | |
val leftNodes = if (leftOpt.isDefined) traverse(leftOpt.get) else List[Node]() | |
val rightNodes = if (rightOpt.isDefined) traverse(rightOpt.get) else List[Node]() | |
(List(x) ++ leftNodes ++ rightNodes).sortBy{ x=> x.id } | |
} | |
/** | |
List[Node] => List[MyNodeStats] | |
*/ | |
def nodeStats(nodes:List[Node]):List[MyNodeStats] = { | |
// find parents of all nodes | |
val prev:List[MyNodeStats] = nodes.map{ | |
// node:Int, left:Int, right:Int, isLeaf:Boolean, parent:Int = -1) | |
n:Node => MyNodeStats( n, n.id, | |
if (n.leftNode.isDefined) n.leftNode.get.id else -1, | |
if (n.rightNode.isDefined) n.rightNode.get.id else -1, | |
n.isLeaf) | |
} | |
prev.map{ | |
ns:MyNodeStats => | |
val me = ns.me | |
val l = prev.filter{ lns:MyNodeStats => lns.left == me.id } | |
val r = prev.filter{ rns:MyNodeStats => rns.right == me.id } | |
MyNodeStats(me, ns.node, ns.left, ns.right, ns.isLeaf, | |
{if(l.size > 0) l.head.node else if (r.size > 0) r.head.node else -1} | |
) | |
} | |
} | |
def nodeMap(nodes:List[MyNodeStats]):Map[Int, MyNodeStats] = { | |
nodes.map{ me=> (me.node, me) }.toMap | |
} | |
/** | |
Given a single node, find the path from the root node to it | |
*/ | |
def pathFromRoot(nodestats:MyNodeStats, nodemap:Map[Int, MyNodeStats], path:List[Node] = List.empty):List[Node] = { | |
val myparent:Int = nodestats.parent // find your parent | |
if ( myparent == -1) path // must be rootnode, hence terminate recursion | |
else pathFromRoot( nodemap(myparent), nodemap, {nodemap(myparent).me::path} ) // recurse | |
} | |
/** | |
Given a bunch of nodes, find their respective paths from the root node | |
*/ | |
def pathFromRoot(nodes:List[MyNodeStats]):Map[Int, List[Node]] = { | |
val nodemap = nodeMap(nodes) | |
nodes.map{ nodestats => | |
(nodestats.node, pathFromRoot(nodestats, nodemap, List(nodestats.me))) | |
}.toMap | |
} | |
// obtain features along the path | |
def path2String(path: List[Node], featureMap:Map[Int, FeatureStats]):String = { | |
val dummy = FeatureStats(idx = -1, name="NOT FOUND", size = 1) | |
path.flatMap{ node => | |
val splitOpt = node.split | |
if (splitOpt.isDefined) { | |
val feature = featureMap.getOrElse(splitOpt.get.feature, dummy) | |
Some(feature.name + ":" + feature.size) | |
} else None | |
}.mkString(" -> ") | |
} | |
def mkNode(node:Node, left:Node, right:Node):Node = { | |
new Node(node.id, node.predict,node.impurity, false, node.split, Some(left),Some(right), node.stats) | |
} | |
def mkNodeLeaf(node:Node):Node = { | |
new Node(node.id, node.predict,node.impurity,true, None, None, None, None) | |
} | |
def mkTree(nodemap:Map[Int, MyNodeStats]):DecisionTreeModel = { | |
new DecisionTreeModel(addNode(nodemap(1).me, nodemap), Algo.Regression) | |
} | |
def addNode(n:Node, nodemap:Map[Int, MyNodeStats]):Node = { | |
if (n.isLeaf) n | |
else { | |
val leftID:Int = nodemap(n.id).left | |
val rightID:Int = nodemap(n.id).right | |
val left:Node = nodemap(leftID).me | |
val right:Node = nodemap(rightID).me | |
mkNode(n,addNode(left, nodemap), addNode(right, nodemap)) | |
} | |
} | |
/* Given a list of all nodes in a tree and the set of leaves to be removed, | |
build a new Decision Tree Model that doesn't contain those leaves | |
*/ | |
def prune(leaves:List[Int], nodestats:List[MyNodeStats]):DecisionTreeModel = { | |
val nodemap = nodeMap(nodestats) | |
val acc = Map(nodemap.toArray: _*) | |
mkTree(leaves.foldLeft(acc) { | |
(acc, id) => | |
val mystats = nodemap(id) | |
// get my parent | |
val myparentid:Int = mystats.parent | |
val myparent:Node = nodemap(myparentid).me | |
// get my grandparent | |
val gpid = nodemap(myparentid).parent | |
// make my parent a leaf | |
val myNewParent = mkNodeLeaf(myparent) | |
val myNewParentStats = MyNodeStats(myNewParent, myNewParent.id, -1, -1, true, gpid) | |
// attach new parent leaf, remove me & my sibling | |
acc | |
.+(myparentid -> myNewParentStats) | |
.-(nodemap(myparentid).left) | |
.-(nodemap(myparentid).right) | |
}) | |
} | |
} | |
/** | |
A MyNodeStats object tells you your leaf/non-leaf status, your parent & your left & right children | |
For split, infogain, entropy etc. refer to the Node object | |
*/ | |
case class MyNodeStats(me:Node, node:Int, left:Int, right:Int, isLeaf:Boolean, parent:Int = -1) | |
/** | |
categorical feature number, | |
feature name, | |
cardinality of the feature ( ie. number of categories in feature) | |
*/ | |
case class FeatureStats(idx:Int, name:String, size:Int) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
DIR: ~/workspace/hcom | |
TO BUILD: sbt assembly | |
TO RUN: | |
spark-submit --driver-memory 128g --executor-memory 128g --class com.marin.dt.DecisionTreeClustering --master local[*] target/scala-2.10/hcom.jar | |
" | |
--src test/input.csv | |
--cols campaign,keyword,quality_score,ad_group,theme,pages,seconds,bounces,number_of_news,abv | |
--categoricalCols campaign,keyword,quality_score,ad_group,theme | |
--labelCol cvr | |
--filterCol cvr | |
--splitters ~:,-, ~ | |
--ignore ~[,],+~ | |
" | |
spark-submit --driver-memory 128g --executor-memory 128g --class com.marin.dt.DecisionTreeClustering --master local[*] target/scala-2.10/hcom.jar "--src test/input.csv --cols campaign,keyword,quality_score,ad_group,theme,pages,seconds,bounces,number_of_news,abv --categoricalCols campaign,keyword,quality_score,ad_group,theme --labelCol cvr --filterCol cvr --splitters ~:,-, ~ --ignore ~[,],+~" | |
*/ | |
package com.marin.dt | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.broadcast.Broadcast | |
import org.apache.spark.SparkConf | |
import org.apache.spark.rdd._ | |
import com.marin.util.{Args, Stemmer} | |
import org.apache.spark.mllib.linalg.{Vector, DenseVector, SparseVector} | |
import org.apache.spark.mllib.regression.LabeledPoint | |
import org.apache.spark.mllib.tree.DecisionTree | |
import org.apache.spark.mllib.tree.model.{DecisionTreeModel,Node} | |
import org.apache.spark.mllib.evaluation.RegressionMetrics | |
object DecisionTreeClustering extends App { | |
val conf = new SparkConf() | |
.setMaster("local[28]") | |
.setAppName("HCOM Job") | |
.set("spark.driver.maxResultSize", "100g") | |
.set("spark.local.dir", "/media/kraman/disk2/tmp") | |
.set("spark.akka.threads", "256") | |
.set("spark.storage.memoryFraction", "0.8") | |
val sc = new SparkContext(conf) | |
val myargs = Args(args.mkString) | |
// cmd-line args | |
val srcfile = myargs("src") | |
val allFeatureCols = myargs("cols").split(",") | |
val categoricalCols = myargs("categoricalCols").split(",") | |
val labelCol = myargs("labelCol") // target label for regression | |
// construct a training set using only the rows for which filteredColumn is nonzero | |
val filterCol = myargs("filterCol") // this column must have non-zero entries | |
// how to parse columns | |
val splitters = Seq(":","-"," ") | |
val ignore = Seq("[","]","+") | |
def time = System.currentTimeMillis | |
// split file into header & lines | |
val (header:Map[String, Int], lines:RDD[String]) = headerAndLines(srcfile) | |
// filter lines based on some filter column | |
val filteredData:RDD[String] = filter(lines, header(filterCol)) | |
println("Rows:" + filteredData.count) | |
// convert to one hot strings | |
val (onehotData:RDD[String], colMap:Map[Int, Map[String, Int]]) = convert2OneHot(filteredData, header, allFeatureCols, categoricalCols, labelCol, splitters, ignore) | |
// get feature names for ids used in labeled point | |
val featureNames:Map[Int, String] = featureID2featureName(colMap, header, allFeatureCols, categoricalCols) | |
sc.makeRDD(featureNames.toList.sortBy{ x=> x._1 }, 1).saveAsTextFile("features" + time) | |
val n = featureNames.size | |
// get dataset as labeled point | |
val data:RDD[LabeledPoint] = read(onehotData).filter { x:LabeledPoint => x.features.size == n } | |
println("datasize" + data.count) | |
println("featurevector size" + featureNames.size) | |
// build decision tree models & save their stats | |
buildModels(data).foreach { model => | |
val predictionsAndObservations = data.map { lp => (model.predict(lp.features), lp.label) } | |
val metrics = new RegressionMetrics(predictionsAndObservations) | |
sc.makeRDD(List(ClusterTree.regressionStats(model, metrics)) ,1).saveAsTextFile("tree" + time) | |
sc.makeRDD(List(ClusterTree.nodeStats(model).mkString("\n")) ,1).saveAsTextFile("nodestats" + time) | |
sc.makeRDD(List(ClusterTree.nodePaths(model, featureNames).mkString("\n")) ,1).saveAsTextFile("nodepaths" + time) | |
sc.makeRDD(List(ClusterTree.featureCount(model, featureNames).mkString("\n")) ,1).saveAsTextFile("featurecount" + time) | |
} | |
// ALL DONE!!! | |
// split srcfile into a header map & the rest | |
// the header map is just the column header string, broken up by comma, presented as a map so given column index we get column name | |
// the rest is all the lines minus the header | |
def headerAndLines(srcfile:String):(Map[String,Int], RDD[String]) = { | |
val rdd = sc.textFile(srcfile, 128).zipWithIndex | |
val h:String = rdd.filter{ x=> x._2 == 0 }.map{ x => x._1 }.take(1).head | |
val rest:RDD[String] = rdd.filter{ x=> x._2 != 0 }.map{ x => x._1 } | |
val header = h.split(",").zipWithIndex.toMap | |
(header,rest) | |
} | |
// given the lines & a filterCol, we return only those lines for which the | |
def filter(lines:RDD[String], filterCol:Int):RDD[String] = { | |
lines.filter { x:String => | |
try { | |
val entry = x.split(",")(filterCol) | |
entry.trim.toDouble > 0.0 | |
} catch { | |
case e:Exception => // probably a header or some data we can't parse, just keep it as-is | |
false | |
} | |
} | |
} | |
// given a feature id in the tree, what feature name does it map to ? | |
def featureID2featureName(colMap:Map[Int, Map[String, Int]], header:Map[String, Int], columns:Seq[String], categoricalCols:Seq[String]):Map[Int, String] = { | |
val cols:Seq[Int] = columns.map{ c => header(c) } | |
val revmap:Map[Int, String] = header.map{ x => (x._2, x._1) } | |
val catcols:Seq[Int] = categoricalCols.map{ c => header(c) } | |
val list:List[String] = cols.map { col => | |
if (catcols.contains(col)) { | |
val m = colMap(col) | |
println(col + ":" + m.size) | |
m.toList.sortBy{ x => x._2 }.map{ x=> x._1 } // want the feature names in same order as their sorted index | |
} else { | |
println(col + ":1") | |
List(revmap(col)) | |
} | |
}.reduceLeft{_ ++ _} | |
list.zipWithIndex.map{ x=> (x._2, x._1) }.toMap | |
} | |
// cpnvert data to labeled points for training | |
def read(lines:RDD[String]):RDD[LabeledPoint] = { | |
/* format | |
account~campaign~keyword~clicks~impressions~cost~conversions~total_conv_value~quality_score~cvr | |
25,7,23,20~93,65,85,25,82~12051,1212,680,5323,3169~6.0~101.0~14.71~1.0~937.42~7.0~0.166666666667 | |
*/ | |
lines.map{ line => | |
val (arr,last) = (line.split("~").init,line.split("~").last) // the last column MUST BE the label we try to forecast | |
val featureData:Array[Double] = arr.map{ x=> | |
if (x.contains(",")) { | |
val sv = x.split(",").map{ x => x.toInt } | |
val size = sv.head | |
val indices = sv.tail.toArray | |
new SparseVector(size, indices, Array.fill[Double](indices.size)(1.0)).toArray | |
} else { | |
Array(x.toDouble) | |
} | |
}.reduceLeft{ _ ++ _ } | |
val features = new DenseVector(featureData).toSparse | |
val label = last.toDouble | |
LabeledPoint(label, features) | |
} | |
} | |
def buildModels(dataset:RDD[LabeledPoint]) = { | |
// try trees of varying depth | |
val depths = (15 to 10 by -1).toList | |
depths.par.map{ depth => | |
DecisionTree.trainRegressor(dataset, Map[Int, Int](), impurity="variance", maxDepth=depth, maxBins=32) | |
}.seq | |
} | |
def convert2OneHot(src:RDD[String], | |
header:Map[String, Int], | |
columns:Seq[String], | |
categoricalCols:Seq[String], | |
labelCol:String, | |
splitters:Seq[String], | |
ignore:Seq[String]): (RDD[String],Map[Int, Map[String, Int]]) = { | |
// wordcount on a column | |
def parseColumn(iter:RDD[String], col:Int, splitters:Seq[String], ignore:Seq[String]):List[(String, Int)] = { | |
val columnEntries:Seq[String] = iter.flatMap{ line=> | |
val arr = line.split(",") | |
if (arr.size > col && arr(col).trim.size > 0) { | |
Some(arr(col).trim) | |
} else None | |
}.distinct | |
.collect | |
//println(col + "size" + columnEntries.size) | |
val res = columnEntries.foldLeft(Map[String, Int]()) { (entries,entry) => | |
getKeys(entry, splitters, ignore) | |
.foldLeft(entries){ (acc,key) => | |
val sum = acc.getOrElse(key, 0) + 1 | |
acc ++ Map(key -> sum) | |
} | |
} | |
//println(col + "finalsize" + res.size) | |
res.toList.sortBy{ kv => - kv._2 } // sort by count in descending order | |
} | |
def getKeys(e:String,splitters:Seq[String], ignore:Seq[String]):Seq[String] = { | |
val a = ignore | |
.foldLeft(e.toLowerCase){ (acc,b) => | |
acc.replace(b , "") | |
} | |
splitters | |
.foldLeft(Seq(a)){ (acc,b) => | |
acc.map{ x=> x.split(b) }.reduceLeft{ _ ++ _ } | |
} | |
} | |
// name to number | |
val cols:Seq[Int] = columns.map{ c => header(c) } | |
val catcols:Seq[Int] = categoricalCols.map{ c => header(c) } | |
val asis = cols.diff(catcols) | |
val labelcol:Int = header(labelCol) | |
println("Cols" + cols + "Catcols" + catcols + "ASIS" + asis + "Label" + labelcol) | |
// get a unique set of words for each categorical column | |
val colMap:Map[Int, Map[String, Int]] = catcols.map{ col => | |
val wordCount:List[(String, Int)] = parseColumn(src, col, splitters, ignore) | |
//println(col + "," + wordCount.size ) | |
// optionally save wordcount here to get glimpse of which entries matter | |
// now sort the words & index them | |
val sortedIndexedWords = wordCount.map{ x=> x._1 }.sorted.zipWithIndex.toMap | |
//sortedIndexedWords.foreach(println) | |
(col, sortedIndexedWords) | |
}.toMap | |
// read thru iterator once & replace each row with onehot vectors from desired columns, then concat | |
val res = src.map { line => | |
val arr = line.split(",") | |
// first get those columns that need to be converted to onehot vectors | |
val vectors:Seq[String] = catcols.map { col => | |
val allkeysMap = colMap(col) | |
//allkeysMap.foreach(println) | |
// make a vector of zeros that replaces this column entry | |
val n = allkeysMap.keys.size | |
// now parse this column & find all keys | |
// then modify the vector of zeros for those keys | |
val indices:Seq[Int] = if (arr.size > col && arr(col).trim.size > 0) { | |
val a = arr(col).trim | |
val b = getKeys(a, splitters, ignore) | |
//println(a + "->" + b + "splitter" + splitters + "ignore" + ignore) | |
b.map{ k => allkeysMap(k) } // get indices of each key | |
} else Seq[Int]() | |
// return vector for this column entry | |
{n::indices.toList}.mkString(",") | |
} | |
// now get those columns which need to be kept as-is | |
val rest:Seq[Double] = asis.map{ col => | |
if (arr.size > col && arr(col).trim.size > 0) | |
try { arr(col).toDouble } catch { case e:Exception => 0.0 } | |
else 0.0 | |
} | |
// now get label | |
val label:Double = if (arr.size > labelcol && arr(labelcol).trim.size > 0) { | |
try { arr(labelcol).toDouble } catch { case e:Exception => 0.0 } | |
} else 0.0 | |
val myline = {vectors ++ rest ++ Seq(label)}.mkString("~") | |
myline | |
} | |
(res, colMap) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment