Skip to content

Instantly share code, notes, and snippets.

@krishnanraman
Last active August 29, 2015 13:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krishnanraman/8700540 to your computer and use it in GitHub Desktop.
Save krishnanraman/8700540 to your computer and use it in GitHub Desktop.
object GSSJob extends App { // A Data Mining job sourcing from General Social Survey (GSS) data.
val in = args(0) // input file
val task = args(1) // job, histo etc.
val data = src
val columnHeader = data.head
task match {
case "job" => job
case "histo" => histo
}
def src = io.Source.fromFile(in).getLines.toStream
def colNum(colName:String) = {
val cols = columnHeader.split("\t").toIndexedSeq
cols.indexOf {
cols.filter{ _.contains(colName) }.head
}
}
def histo {
val (colName, colType) = (args(2), args(3))
val col = colNum(colName)
val rows = data.tail.flatMap {
line:String =>
val row = line.split("\t").toIndexedSeq
val xval = row(col)
if (xval.trim.length > 0) Some(xval) else None
}
colType match {
case "quantitative" =>
val drows = rows.map(i=> i.toDouble)
val (minx,maxx) = (drows.min, drows.max)
val range = (maxx-minx)/10.0
val lval = (minx to maxx by range).map(num => math.round(num*10).toInt/10.0)
val rval = lval.tail
val buckets = lval.zip(rval)
val empty = Array.fill[Int](10)(0)
val counters = drows.foldLeft(empty){
(bucketacc, item) =>
val idx = buckets.map( xy => xy._1 <= item && item <= xy._2).indexOf(true)
if( idx == -1) { println("Invalid " + item)} else {empty(idx) += 1 }
empty
}
val py = counters.max
val sorted = buckets.map( i=> "["+i._1 + " - "+ i._2 + "]").zip(counters)
val px = sorted.map( w=> w._1.length).max
sorted.map { i=>
(i._1, i._2, (i._2 * 60.0/py).toInt)
}
.foreach { i=>
printf( "%"+px+"s: %s\t%d\n", i._1, List.fill[Char](i._3)('#').mkString(""), i._2)
}
case "qualitative" =>
val sorted = rows
.groupBy(x=>x)
.map { xy => (xy._1, xy._2.size) }
.toList
.sortBy(x=>x._2)
.reverse
val py = sorted.head._2
val px = rows.distinct.map(w => w.length).max
sorted.map { i=>
(i._1, i._2, (i._2 * 60.0/py).toInt)
}
.foreach { i=>
printf( "%"+px+"s: %s\t%d\n", i._1, List.fill[Char](i._3)('#').mkString(""), i._2)
}
}
}
// Dictionary to replace strings with numbers
// "Age=23&Weight=35" becomes Map( Age->23, Weight->35 )
def buildMap(str:String) = {
val empty:Map[String, String] = Map()
str
.split("&")
.foldLeft(empty){ (acc, mapper) =>
val arr = mapper.split("=")
arr.size match {
case 2 => acc ++ Map(arr(0) -> arr(1))
case _ => acc
}
}
}
def getColumns(row: IndexedSeq[String], xcol:Int, ycol:Int) = {
val (xval, yval) = (row(xcol), row(ycol))
val valid = List(xval,yval)
.map{ i=> Option(i).isDefined && i.trim.length > 0 }
.reduce(_&&_)
if (valid) Some(xval, yval) else None
}
def filterValid(row: IndexedSeq[String], filter:Map[String, String], xt:(Int, Double), yt:(Int, Double)) = {
(Set(true) ++ filter.keys.map{
key =>
val col = colNum(key)
val (xcol, x) = xt
val (ycol, y) = yt
val filterVal = col match {
case col if col == xcol => x
case col if col == ycol => y
case _ => row(col)
}
val criterion = filter(key)
if (criterion.contains("-")) {
val range = criterion.split("-")
val (a,b) = (range(0).toInt, range(1).toInt)
val f = if (filterVal.toString.trim.length > 0) filterVal.toString.toDouble else -1
(a <= f) && (f <= b)
} else {
filterVal.equals(criterion)
}
}).reduce(_&&_)
}
def scale(x:OLS.Data) = {
val scaler = x.max - x.min
val min = x.min
(x.map { i => (i-min)/scaler }, min, scaler)
}
def normalize(x:OLS.Data) = {
val mu = OLS.mean(x)
val sigma = OLS.stdev(x)
(x.map{ i=> (i - mu)/sigma }, mu, sigma)
}
def job {
val data = src
val (xcol,ycol) = (colNum(args(2)), colNum(args(3))) // we only want columns x & y
val replacer = buildMap(args(4)) // replace strings with numbers in the data
val filter = buildMap(args(5)) // filter criterion based on arbitrary columns
val dataxy = data
.tail // column skip the header
.flatMap {
line:String =>
val row = line.split("\t").toIndexedSeq
val colOpt = getColumns(row, xcol, ycol)
if (colOpt.isDefined) {
val (xval, yval) = colOpt.get
val (x,y)= (xval.toDouble, replacer.getOrElse(yval, yval).toDouble)
if (filterValid(row, filter, (xcol,x), (ycol,y)))
Some(x,y)
else None
} else None
}
val (x:OLS.Data, y:OLS.Data) = dataxy.toIndexedSeq.unzip
val (scaledx, xmin, xscaler) = scale(x)
val (scaledy, ymin, yscaler) = scale(y)
val ysq = scaledx.zip(scaledy).map{ xy => val(x,y) = xy; y - x*x }
val (normx, mux, sigmax) = normalize(scaledx)
val (normy, muy, sigmay) = normalize(ysq)
val corr = OLS.corr(normx,normy)
printf("\n OLS Normalized Poly: \n mux:\t%.3f \n sigmax:\t%.3f \n xmin:\t%.3f \n xscaler:\t%.3f \n muy:\t%.3f \n sigmay:\t%.3f \n ymin:\t%.3f \n yscaler:\t%.3f \n corr:\t%.3f \n rsq:\t%.3f \n ",
mux, sigmax, xmin, xscaler,
muy, sigmay, ymin, yscaler,
corr, corr*corr)
}
}
object OLS {
type Data = IndexedSeq[Double]
def mean(d:Data) = d.sum/d.size
def variance(d:Data) = {
val mu = mean(d)
mean(d.map(i => (i-mu)*(i-mu)))
}
def stdev(d:Data) = math.sqrt(variance(d))
def corr(x:Data, y:Data) = mean(x.zip(y).map(xy => xy._1 * xy._2))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment