Skip to content

Instantly share code, notes, and snippets.

@kmader

kmader/Intro.md Secret

Last active August 29, 2015 14:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kmader/e6fb564f4c180b87042f to your computer and use it in GitHub Desktop.
Save kmader/e6fb564f4c180b87042f to your computer and use it in GitHub Desktop.
Hyperspectral Analysis in Spark

Hyperspectral Data

The data for these experiments are taken from a Raman Spectral Imaging project conducted with Dr. Thomas Huser at the Center for Biophotonics Science and Technology at UCDavis.

// read in the data
val datapath = "/Volumes/WORKDISK/WorkData/Hyperspectral/raw/"
val rawdata = sc.textFile(datapath+"*.csv")
// get the first line
rawdata.first
// parse the data into columns
val csvdata=rawdata.map(_.split(","))
// convert to values
val numdata=csvdata.map(_.map(_.toDouble))
// make a nice case class
case class spec_point(x: Double, y:Double, z:Double, wavenumber: Double, intensity: Double) {
def getPos() = (x,y,z)
def getVal() = (wavenumber,intensity)
}
// wrap the data in the case class
val specdata = numdata.map(pt => spec_point(pt(0),pt(1),pt(2),pt(3),pt(4)))
val ptspec = specdata.map(spt => (spt.getPos,spt))
// Now for the spark sql code
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
import sqlContext._
// register the data
specdata.registerAsTable("Spectra")
// run a simple sql command
val avgInt = sql("SELECT AVG(intensity) FROM Spectra")
// save as parquet
specdata.saveAsParquetFile("/Volumes/WORKDISK/WorkData/Hyperspectral/spec.pqt")
// starting from the parquet file directly
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
import sqlContext._
val pqspec = sqlContext.parquetFile("spec.pqt")
pqspec.registerAsTable("PQSpectra")
// get an average image with SparkSQL
val avgimg = sql("SELECT x,y,z,AVG(intensity) FROM PQSpectra GROUP BY ((z+y*1000)*1000+x)")
val imgarr = avgimg.collect
// get an average image normally
val ptspec = pqspec.map(_.map(_.asInstanceOf[Double])).map( spt => ((spt(0),spt(1),spt(2)),(spt(3),spt(4))) )
// organize it as an image
val imgData = ptspec.groupByKey
val nAvgImg = imgData.mapValues(cspec => cspec.map(_._2).sum/cspec.size)
// sorting command for spectra
import scala.util.Sorting.stableSort
// organize it as an image
val imgData = ptspec.groupByKey
val imgSpec = imgData.mapValues {
rawspec =>
val outspec = rawspec.toArray
stableSort(outspec,(e1: spec_point,e2: spec_point) => e1.wavenumber<e2.wavenumber)
outspec.map(_.intensity)
}
/**
* Using MLLib
*
**/
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
val imgSV = imgSpec.mapValues{ ptarr => Vectors.dense(ptarr) }
val kvData = imgSV.map(_._2).filter(_.size==584).cache
// train the model with 4 groups and 1000 iterations
val kModel = KMeans.train(kvData,4,1000)
// create an image from the clusters
val imgClusters = imgSV.mapValues{ cvec => kModel.predict(cvec)}
/**
* Calculate Principal Components
*/
import org.apache.spark.mllib.linalg.{Matrix, Matrices}
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val rm = new RowMatrix(kvData)
// pca for first 3 components
val prinComp = rm.computePrincipalComponents(3)
// calcupate projections
val projs: RowMatrix = rm.multiply(pc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment