Skip to content

Instantly share code, notes, and snippets.

@krishnanraman
Created June 9, 2016 00:38
Show Gist options
  • Save krishnanraman/1719265ebc3d065f3021b741dba34540 to your computer and use it in GitHub Desktop.
Save krishnanraman/1719265ebc3d065f3021b741dba34540 to your computer and use it in GitHub Desktop.
package com.marin.dt
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.{DecisionTreeModel,Node}
import org.apache.spark.mllib.tree.configuration.Algo
import org.apache.spark.mllib.evaluation.RegressionMetrics
/*
Collection of routines to Prune a Decision Tree, & enable the following features
1. Traverse a decision tree
2. Obtain Node Stats (ie. parent node, left & right siblings etc.) about all nodes in a tree
3. Obtain the traversal path from the root node to any given node
*/
// trait ClusterTree { def cluster(input:Dataset):Dataset }
object ClusterTree {
def regressionStats(model:DecisionTreeModel, metrics:RegressionMetrics):String = {
"Decision Tree Depth: %d RMSE: %.3f Explained Variance %.3f R^2 %.3f\n%s\n\n".format(
model.depth,
metrics.rootMeanSquaredError,
metrics.explainedVariance,
metrics.r2,
model.toDebugString)
}
def featureCount(model:DecisionTreeModel, featureNames:Map[Int, String]):List[(String,Int)] = {
val lines = model.toDebugString.split("\n")
val f = "feature"
lines.foldLeft(Map[String, Int]()) { (map, line) =>
if (line.contains(f)) {
val startIdx = line.indexOf(f) + f.size
val endIdx = line.indexOf(" ", startIdx + 1)
val fid = line.slice(startIdx, endIdx).trim.toInt
val name = featureNames(fid)
val count = map.getOrElse(name, 0) + 1
map ++ Map(name -> count)
} else map
}
.toList
.sortBy{ x => - x._2 }
}
// for every non-leaf, get a list of paths. A path is me->left->right, where left & right are obtained recursively
def nodePaths(model:DecisionTreeModel, featureNames:Map[Int, String]):List[String] = {
/*
WANT (nodeid, path to me from root)
(1,f_publisherGroupName:4150)
(2,f_publisherGroupName:4150 -> f_publisherGroupName:4150)
(3,f_publisherGroupName:4150 -> f_publisherGroupName:4150)
(4,f_publisherGroupName:4150 -> f_publisherGroupName:4150 -> f_publisherGroupName:4150)
*/
val allstats:List[MyNodeStats] = nodeStats(traverse(model.topNode))
val allpathsFromRoot:Map[Int, List[Node]] = pathFromRoot(allstats)
val nodemap: Map[Int, MyNodeStats] = nodeMap(allstats)
val fmap = traverse(model.topNode).flatMap{ node =>
if (node.split.isDefined) {
val nodeid = node.id
val featureNum = node.split.get.feature
val featureName = featureNames(featureNum)
val featureStat = FeatureStats(featureNum, featureName, 1)
val mapping = (featureNum -> featureStat)
Some(mapping)
} else None
}.toMap
allstats.map{ stat =>
val id = stat.me.id
val path = pathFromRoot(stat, nodemap, allpathsFromRoot(id))
val pStr = path2String(path, fmap)
(id, pStr).toString
}
}
def nodeStats(model:DecisionTreeModel):List[MyNodeStats] = {
nodeStats(traverse(model.topNode))
}
/**
A recursive routine to traverse the decision tree given the root, build a sorted list of nodes
*/
def traverse(x:Node):List[Node] = {
val leftOpt = x.leftNode
val rightOpt = x.rightNode
val leftNodes = if (leftOpt.isDefined) traverse(leftOpt.get) else List[Node]()
val rightNodes = if (rightOpt.isDefined) traverse(rightOpt.get) else List[Node]()
(List(x) ++ leftNodes ++ rightNodes).sortBy{ x=> x.id }
}
/**
List[Node] => List[MyNodeStats]
*/
def nodeStats(nodes:List[Node]):List[MyNodeStats] = {
// find parents of all nodes
val prev:List[MyNodeStats] = nodes.map{
// node:Int, left:Int, right:Int, isLeaf:Boolean, parent:Int = -1)
n:Node => MyNodeStats( n, n.id,
if (n.leftNode.isDefined) n.leftNode.get.id else -1,
if (n.rightNode.isDefined) n.rightNode.get.id else -1,
n.isLeaf)
}
prev.map{
ns:MyNodeStats =>
val me = ns.me
val l = prev.filter{ lns:MyNodeStats => lns.left == me.id }
val r = prev.filter{ rns:MyNodeStats => rns.right == me.id }
MyNodeStats(me, ns.node, ns.left, ns.right, ns.isLeaf,
{if(l.size > 0) l.head.node else if (r.size > 0) r.head.node else -1}
)
}
}
def nodeMap(nodes:List[MyNodeStats]):Map[Int, MyNodeStats] = {
nodes.map{ me=> (me.node, me) }.toMap
}
/**
Given a single node, find the path from the root node to it
*/
def pathFromRoot(nodestats:MyNodeStats, nodemap:Map[Int, MyNodeStats], path:List[Node] = List.empty):List[Node] = {
val myparent:Int = nodestats.parent // find your parent
if ( myparent == -1) path // must be rootnode, hence terminate recursion
else pathFromRoot( nodemap(myparent), nodemap, {nodemap(myparent).me::path} ) // recurse
}
/**
Given a bunch of nodes, find their respective paths from the root node
*/
def pathFromRoot(nodes:List[MyNodeStats]):Map[Int, List[Node]] = {
val nodemap = nodeMap(nodes)
nodes.map{ nodestats =>
(nodestats.node, pathFromRoot(nodestats, nodemap, List(nodestats.me)))
}.toMap
}
// obtain features along the path
def path2String(path: List[Node], featureMap:Map[Int, FeatureStats]):String = {
val dummy = FeatureStats(idx = -1, name="NOT FOUND", size = 1)
path.flatMap{ node =>
val splitOpt = node.split
if (splitOpt.isDefined) {
val feature = featureMap.getOrElse(splitOpt.get.feature, dummy)
Some(feature.name + ":" + feature.size)
} else None
}.mkString(" -> ")
}
def mkNode(node:Node, left:Node, right:Node):Node = {
new Node(node.id, node.predict,node.impurity, false, node.split, Some(left),Some(right), node.stats)
}
def mkNodeLeaf(node:Node):Node = {
new Node(node.id, node.predict,node.impurity,true, None, None, None, None)
}
def mkTree(nodemap:Map[Int, MyNodeStats]):DecisionTreeModel = {
new DecisionTreeModel(addNode(nodemap(1).me, nodemap), Algo.Regression)
}
def addNode(n:Node, nodemap:Map[Int, MyNodeStats]):Node = {
if (n.isLeaf) n
else {
val leftID:Int = nodemap(n.id).left
val rightID:Int = nodemap(n.id).right
val left:Node = nodemap(leftID).me
val right:Node = nodemap(rightID).me
mkNode(n,addNode(left, nodemap), addNode(right, nodemap))
}
}
/* Given a list of all nodes in a tree and the set of leaves to be removed,
build a new Decision Tree Model that doesn't contain those leaves
*/
def prune(leaves:List[Int], nodestats:List[MyNodeStats]):DecisionTreeModel = {
val nodemap = nodeMap(nodestats)
val acc = Map(nodemap.toArray: _*)
mkTree(leaves.foldLeft(acc) {
(acc, id) =>
val mystats = nodemap(id)
// get my parent
val myparentid:Int = mystats.parent
val myparent:Node = nodemap(myparentid).me
// get my grandparent
val gpid = nodemap(myparentid).parent
// make my parent a leaf
val myNewParent = mkNodeLeaf(myparent)
val myNewParentStats = MyNodeStats(myNewParent, myNewParent.id, -1, -1, true, gpid)
// attach new parent leaf, remove me & my sibling
acc
.+(myparentid -> myNewParentStats)
.-(nodemap(myparentid).left)
.-(nodemap(myparentid).right)
})
}
}
/**
A MyNodeStats object tells you your leaf/non-leaf status, your parent & your left & right children
For split, infogain, entropy etc. refer to the Node object
*/
case class MyNodeStats(me:Node, node:Int, left:Int, right:Int, isLeaf:Boolean, parent:Int = -1)
/**
categorical feature number,
feature name,
cardinality of the feature ( ie. number of categories in feature)
*/
case class FeatureStats(idx:Int, name:String, size:Int)
/*
DIR: ~/workspace/hcom
TO BUILD: sbt assembly
TO RUN:
spark-submit --driver-memory 128g --executor-memory 128g --class com.marin.dt.DecisionTreeClustering --master local[*] target/scala-2.10/hcom.jar
"
--src test/input.csv
--cols campaign,keyword,quality_score,ad_group,theme,pages,seconds,bounces,number_of_news,abv
--categoricalCols campaign,keyword,quality_score,ad_group,theme
--labelCol cvr
--filterCol cvr
--splitters ~:,-, ~
--ignore ~[,],+~
"
spark-submit --driver-memory 128g --executor-memory 128g --class com.marin.dt.DecisionTreeClustering --master local[*] target/scala-2.10/hcom.jar "--src test/input.csv --cols campaign,keyword,quality_score,ad_group,theme,pages,seconds,bounces,number_of_news,abv --categoricalCols campaign,keyword,quality_score,ad_group,theme --labelCol cvr --filterCol cvr --splitters ~:,-, ~ --ignore ~[,],+~"
*/
package com.marin.dt
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.SparkConf
import org.apache.spark.rdd._
import com.marin.util.{Args, Stemmer}
import org.apache.spark.mllib.linalg.{Vector, DenseVector, SparseVector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.{DecisionTreeModel,Node}
import org.apache.spark.mllib.evaluation.RegressionMetrics
object DecisionTreeClustering extends App {
val conf = new SparkConf()
.setMaster("local[28]")
.setAppName("HCOM Job")
.set("spark.driver.maxResultSize", "100g")
.set("spark.local.dir", "/media/kraman/disk2/tmp")
.set("spark.akka.threads", "256")
.set("spark.storage.memoryFraction", "0.8")
val sc = new SparkContext(conf)
val myargs = Args(args.mkString)
// cmd-line args
val srcfile = myargs("src")
val allFeatureCols = myargs("cols").split(",")
val categoricalCols = myargs("categoricalCols").split(",")
val labelCol = myargs("labelCol") // target label for regression
// construct a training set using only the rows for which filteredColumn is nonzero
val filterCol = myargs("filterCol") // this column must have non-zero entries
// how to parse columns
val splitters = Seq(":","-"," ")
val ignore = Seq("[","]","+")
def time = System.currentTimeMillis
// split file into header & lines
val (header:Map[String, Int], lines:RDD[String]) = headerAndLines(srcfile)
// filter lines based on some filter column
val filteredData:RDD[String] = filter(lines, header(filterCol))
println("Rows:" + filteredData.count)
// convert to one hot strings
val (onehotData:RDD[String], colMap:Map[Int, Map[String, Int]]) = convert2OneHot(filteredData, header, allFeatureCols, categoricalCols, labelCol, splitters, ignore)
// get feature names for ids used in labeled point
val featureNames:Map[Int, String] = featureID2featureName(colMap, header, allFeatureCols, categoricalCols)
sc.makeRDD(featureNames.toList.sortBy{ x=> x._1 }, 1).saveAsTextFile("features" + time)
val n = featureNames.size
// get dataset as labeled point
val data:RDD[LabeledPoint] = read(onehotData).filter { x:LabeledPoint => x.features.size == n }
println("datasize" + data.count)
println("featurevector size" + featureNames.size)
// build decision tree models & save their stats
buildModels(data).foreach { model =>
val predictionsAndObservations = data.map { lp => (model.predict(lp.features), lp.label) }
val metrics = new RegressionMetrics(predictionsAndObservations)
sc.makeRDD(List(ClusterTree.regressionStats(model, metrics)) ,1).saveAsTextFile("tree" + time)
sc.makeRDD(List(ClusterTree.nodeStats(model).mkString("\n")) ,1).saveAsTextFile("nodestats" + time)
sc.makeRDD(List(ClusterTree.nodePaths(model, featureNames).mkString("\n")) ,1).saveAsTextFile("nodepaths" + time)
sc.makeRDD(List(ClusterTree.featureCount(model, featureNames).mkString("\n")) ,1).saveAsTextFile("featurecount" + time)
}
// ALL DONE!!!
// split srcfile into a header map & the rest
// the header map is just the column header string, broken up by comma, presented as a map so given column index we get column name
// the rest is all the lines minus the header
def headerAndLines(srcfile:String):(Map[String,Int], RDD[String]) = {
val rdd = sc.textFile(srcfile, 128).zipWithIndex
val h:String = rdd.filter{ x=> x._2 == 0 }.map{ x => x._1 }.take(1).head
val rest:RDD[String] = rdd.filter{ x=> x._2 != 0 }.map{ x => x._1 }
val header = h.split(",").zipWithIndex.toMap
(header,rest)
}
// given the lines & a filterCol, we return only those lines for which the
def filter(lines:RDD[String], filterCol:Int):RDD[String] = {
lines.filter { x:String =>
try {
val entry = x.split(",")(filterCol)
entry.trim.toDouble > 0.0
} catch {
case e:Exception => // probably a header or some data we can't parse, just keep it as-is
false
}
}
}
// given a feature id in the tree, what feature name does it map to ?
def featureID2featureName(colMap:Map[Int, Map[String, Int]], header:Map[String, Int], columns:Seq[String], categoricalCols:Seq[String]):Map[Int, String] = {
val cols:Seq[Int] = columns.map{ c => header(c) }
val revmap:Map[Int, String] = header.map{ x => (x._2, x._1) }
val catcols:Seq[Int] = categoricalCols.map{ c => header(c) }
val list:List[String] = cols.map { col =>
if (catcols.contains(col)) {
val m = colMap(col)
println(col + ":" + m.size)
m.toList.sortBy{ x => x._2 }.map{ x=> x._1 } // want the feature names in same order as their sorted index
} else {
println(col + ":1")
List(revmap(col))
}
}.reduceLeft{_ ++ _}
list.zipWithIndex.map{ x=> (x._2, x._1) }.toMap
}
// cpnvert data to labeled points for training
def read(lines:RDD[String]):RDD[LabeledPoint] = {
/* format
account~campaign~keyword~clicks~impressions~cost~conversions~total_conv_value~quality_score~cvr
25,7,23,20~93,65,85,25,82~12051,1212,680,5323,3169~6.0~101.0~14.71~1.0~937.42~7.0~0.166666666667
*/
lines.map{ line =>
val (arr,last) = (line.split("~").init,line.split("~").last) // the last column MUST BE the label we try to forecast
val featureData:Array[Double] = arr.map{ x=>
if (x.contains(",")) {
val sv = x.split(",").map{ x => x.toInt }
val size = sv.head
val indices = sv.tail.toArray
new SparseVector(size, indices, Array.fill[Double](indices.size)(1.0)).toArray
} else {
Array(x.toDouble)
}
}.reduceLeft{ _ ++ _ }
val features = new DenseVector(featureData).toSparse
val label = last.toDouble
LabeledPoint(label, features)
}
}
def buildModels(dataset:RDD[LabeledPoint]) = {
// try trees of varying depth
val depths = (15 to 10 by -1).toList
depths.par.map{ depth =>
DecisionTree.trainRegressor(dataset, Map[Int, Int](), impurity="variance", maxDepth=depth, maxBins=32)
}.seq
}
def convert2OneHot(src:RDD[String],
header:Map[String, Int],
columns:Seq[String],
categoricalCols:Seq[String],
labelCol:String,
splitters:Seq[String],
ignore:Seq[String]): (RDD[String],Map[Int, Map[String, Int]]) = {
// wordcount on a column
def parseColumn(iter:RDD[String], col:Int, splitters:Seq[String], ignore:Seq[String]):List[(String, Int)] = {
val columnEntries:Seq[String] = iter.flatMap{ line=>
val arr = line.split(",")
if (arr.size > col && arr(col).trim.size > 0) {
Some(arr(col).trim)
} else None
}.distinct
.collect
//println(col + "size" + columnEntries.size)
val res = columnEntries.foldLeft(Map[String, Int]()) { (entries,entry) =>
getKeys(entry, splitters, ignore)
.foldLeft(entries){ (acc,key) =>
val sum = acc.getOrElse(key, 0) + 1
acc ++ Map(key -> sum)
}
}
//println(col + "finalsize" + res.size)
res.toList.sortBy{ kv => - kv._2 } // sort by count in descending order
}
def getKeys(e:String,splitters:Seq[String], ignore:Seq[String]):Seq[String] = {
val a = ignore
.foldLeft(e.toLowerCase){ (acc,b) =>
acc.replace(b , "")
}
splitters
.foldLeft(Seq(a)){ (acc,b) =>
acc.map{ x=> x.split(b) }.reduceLeft{ _ ++ _ }
}
}
// name to number
val cols:Seq[Int] = columns.map{ c => header(c) }
val catcols:Seq[Int] = categoricalCols.map{ c => header(c) }
val asis = cols.diff(catcols)
val labelcol:Int = header(labelCol)
println("Cols" + cols + "Catcols" + catcols + "ASIS" + asis + "Label" + labelcol)
// get a unique set of words for each categorical column
val colMap:Map[Int, Map[String, Int]] = catcols.map{ col =>
val wordCount:List[(String, Int)] = parseColumn(src, col, splitters, ignore)
//println(col + "," + wordCount.size )
// optionally save wordcount here to get glimpse of which entries matter
// now sort the words & index them
val sortedIndexedWords = wordCount.map{ x=> x._1 }.sorted.zipWithIndex.toMap
//sortedIndexedWords.foreach(println)
(col, sortedIndexedWords)
}.toMap
// read thru iterator once & replace each row with onehot vectors from desired columns, then concat
val res = src.map { line =>
val arr = line.split(",")
// first get those columns that need to be converted to onehot vectors
val vectors:Seq[String] = catcols.map { col =>
val allkeysMap = colMap(col)
//allkeysMap.foreach(println)
// make a vector of zeros that replaces this column entry
val n = allkeysMap.keys.size
// now parse this column & find all keys
// then modify the vector of zeros for those keys
val indices:Seq[Int] = if (arr.size > col && arr(col).trim.size > 0) {
val a = arr(col).trim
val b = getKeys(a, splitters, ignore)
//println(a + "->" + b + "splitter" + splitters + "ignore" + ignore)
b.map{ k => allkeysMap(k) } // get indices of each key
} else Seq[Int]()
// return vector for this column entry
{n::indices.toList}.mkString(",")
}
// now get those columns which need to be kept as-is
val rest:Seq[Double] = asis.map{ col =>
if (arr.size > col && arr(col).trim.size > 0)
try { arr(col).toDouble } catch { case e:Exception => 0.0 }
else 0.0
}
// now get label
val label:Double = if (arr.size > labelcol && arr(labelcol).trim.size > 0) {
try { arr(labelcol).toDouble } catch { case e:Exception => 0.0 }
} else 0.0
val myline = {vectors ++ rest ++ Seq(label)}.mkString("~")
myline
}
(res, colMap)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment