Last active
August 29, 2015 13:55
-
-
Save krishnanraman/8700540 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object GSSJob extends App { // A Data Mining job sourcing from General Social Survey (GSS) data. | |
val in = args(0) // input file | |
val task = args(1) // job, histo etc. | |
val data = src | |
val columnHeader = data.head | |
task match { | |
case "job" => job | |
case "histo" => histo | |
} | |
def src = io.Source.fromFile(in).getLines.toStream | |
def colNum(colName:String) = { | |
val cols = columnHeader.split("\t").toIndexedSeq | |
cols.indexOf { | |
cols.filter{ _.contains(colName) }.head | |
} | |
} | |
def histo { | |
val (colName, colType) = (args(2), args(3)) | |
val col = colNum(colName) | |
val rows = data.tail.flatMap { | |
line:String => | |
val row = line.split("\t").toIndexedSeq | |
val xval = row(col) | |
if (xval.trim.length > 0) Some(xval) else None | |
} | |
colType match { | |
case "quantitative" => | |
val drows = rows.map(i=> i.toDouble) | |
val (minx,maxx) = (drows.min, drows.max) | |
val range = (maxx-minx)/10.0 | |
val lval = (minx to maxx by range).map(num => math.round(num*10).toInt/10.0) | |
val rval = lval.tail | |
val buckets = lval.zip(rval) | |
val empty = Array.fill[Int](10)(0) | |
val counters = drows.foldLeft(empty){ | |
(bucketacc, item) => | |
val idx = buckets.map( xy => xy._1 <= item && item <= xy._2).indexOf(true) | |
if( idx == -1) { println("Invalid " + item)} else {empty(idx) += 1 } | |
empty | |
} | |
val py = counters.max | |
val sorted = buckets.map( i=> "["+i._1 + " - "+ i._2 + "]").zip(counters) | |
val px = sorted.map( w=> w._1.length).max | |
sorted.map { i=> | |
(i._1, i._2, (i._2 * 60.0/py).toInt) | |
} | |
.foreach { i=> | |
printf( "%"+px+"s: %s\t%d\n", i._1, List.fill[Char](i._3)('#').mkString(""), i._2) | |
} | |
case "qualitative" => | |
val sorted = rows | |
.groupBy(x=>x) | |
.map { xy => (xy._1, xy._2.size) } | |
.toList | |
.sortBy(x=>x._2) | |
.reverse | |
val py = sorted.head._2 | |
val px = rows.distinct.map(w => w.length).max | |
sorted.map { i=> | |
(i._1, i._2, (i._2 * 60.0/py).toInt) | |
} | |
.foreach { i=> | |
printf( "%"+px+"s: %s\t%d\n", i._1, List.fill[Char](i._3)('#').mkString(""), i._2) | |
} | |
} | |
} | |
// Dictionary to replace strings with numbers | |
// "Age=23&Weight=35" becomes Map( Age->23, Weight->35 ) | |
def buildMap(str:String) = { | |
val empty:Map[String, String] = Map() | |
str | |
.split("&") | |
.foldLeft(empty){ (acc, mapper) => | |
val arr = mapper.split("=") | |
arr.size match { | |
case 2 => acc ++ Map(arr(0) -> arr(1)) | |
case _ => acc | |
} | |
} | |
} | |
def getColumns(row: IndexedSeq[String], xcol:Int, ycol:Int) = { | |
val (xval, yval) = (row(xcol), row(ycol)) | |
val valid = List(xval,yval) | |
.map{ i=> Option(i).isDefined && i.trim.length > 0 } | |
.reduce(_&&_) | |
if (valid) Some(xval, yval) else None | |
} | |
def filterValid(row: IndexedSeq[String], filter:Map[String, String], xt:(Int, Double), yt:(Int, Double)) = { | |
(Set(true) ++ filter.keys.map{ | |
key => | |
val col = colNum(key) | |
val (xcol, x) = xt | |
val (ycol, y) = yt | |
val filterVal = col match { | |
case col if col == xcol => x | |
case col if col == ycol => y | |
case _ => row(col) | |
} | |
val criterion = filter(key) | |
if (criterion.contains("-")) { | |
val range = criterion.split("-") | |
val (a,b) = (range(0).toInt, range(1).toInt) | |
val f = if (filterVal.toString.trim.length > 0) filterVal.toString.toDouble else -1 | |
(a <= f) && (f <= b) | |
} else { | |
filterVal.equals(criterion) | |
} | |
}).reduce(_&&_) | |
} | |
def scale(x:OLS.Data) = { | |
val scaler = x.max - x.min | |
val min = x.min | |
(x.map { i => (i-min)/scaler }, min, scaler) | |
} | |
def normalize(x:OLS.Data) = { | |
val mu = OLS.mean(x) | |
val sigma = OLS.stdev(x) | |
(x.map{ i=> (i - mu)/sigma }, mu, sigma) | |
} | |
def job { | |
val data = src | |
val (xcol,ycol) = (colNum(args(2)), colNum(args(3))) // we only want columns x & y | |
val replacer = buildMap(args(4)) // replace strings with numbers in the data | |
val filter = buildMap(args(5)) // filter criterion based on arbitrary columns | |
val dataxy = data | |
.tail // column skip the header | |
.flatMap { | |
line:String => | |
val row = line.split("\t").toIndexedSeq | |
val colOpt = getColumns(row, xcol, ycol) | |
if (colOpt.isDefined) { | |
val (xval, yval) = colOpt.get | |
val (x,y)= (xval.toDouble, replacer.getOrElse(yval, yval).toDouble) | |
if (filterValid(row, filter, (xcol,x), (ycol,y))) | |
Some(x,y) | |
else None | |
} else None | |
} | |
val (x:OLS.Data, y:OLS.Data) = dataxy.toIndexedSeq.unzip | |
val (scaledx, xmin, xscaler) = scale(x) | |
val (scaledy, ymin, yscaler) = scale(y) | |
val ysq = scaledx.zip(scaledy).map{ xy => val(x,y) = xy; y - x*x } | |
val (normx, mux, sigmax) = normalize(scaledx) | |
val (normy, muy, sigmay) = normalize(ysq) | |
val corr = OLS.corr(normx,normy) | |
printf("\n OLS Normalized Poly: \n mux:\t%.3f \n sigmax:\t%.3f \n xmin:\t%.3f \n xscaler:\t%.3f \n muy:\t%.3f \n sigmay:\t%.3f \n ymin:\t%.3f \n yscaler:\t%.3f \n corr:\t%.3f \n rsq:\t%.3f \n ", | |
mux, sigmax, xmin, xscaler, | |
muy, sigmay, ymin, yscaler, | |
corr, corr*corr) | |
} | |
} | |
object OLS { | |
type Data = IndexedSeq[Double] | |
def mean(d:Data) = d.sum/d.size | |
def variance(d:Data) = { | |
val mu = mean(d) | |
mean(d.map(i => (i-mu)*(i-mu))) | |
} | |
def stdev(d:Data) = math.sqrt(variance(d)) | |
def corr(x:Data, y:Data) = mean(x.zip(y).map(xy => xy._1 * xy._2)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment