The data for these experiments are taken from a Raman Spectral Imaging project conducted with Dr. Thomas Huser at the Center for Biophotonics Science and Technology at UCDavis.
-
-
Save kmader/e6fb564f4c180b87042f to your computer and use it in GitHub Desktop.
Hyperspectral Analysis in Spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// read in the data | |
val datapath = "/Volumes/WORKDISK/WorkData/Hyperspectral/raw/" | |
val rawdata = sc.textFile(datapath+"*.csv") | |
// get the first line | |
rawdata.first | |
// parse the data into columns | |
val csvdata=rawdata.map(_.split(",")) | |
// convert to values | |
val numdata=csvdata.map(_.map(_.toDouble)) | |
// make a nice case class | |
case class spec_point(x: Double, y:Double, z:Double, wavenumber: Double, intensity: Double) { | |
def getPos() = (x,y,z) | |
def getVal() = (wavenumber,intensity) | |
} | |
// wrap the data in the case class | |
val specdata = numdata.map(pt => spec_point(pt(0),pt(1),pt(2),pt(3),pt(4))) | |
val ptspec = specdata.map(spt => (spt.getPos,spt)) | |
// Now for the spark sql code | |
import org.apache.spark.sql.SQLContext | |
val sqlContext = new SQLContext(sc) | |
import sqlContext._ | |
// register the data | |
specdata.registerAsTable("Spectra") | |
// run a simple sql command | |
val avgInt = sql("SELECT AVG(intensity) FROM Spectra") | |
// save as parquet | |
specdata.saveAsParquetFile("/Volumes/WORKDISK/WorkData/Hyperspectral/spec.pqt") | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// starting from the parquet file directly | |
import org.apache.spark.sql.SQLContext | |
val sqlContext = new SQLContext(sc) | |
import sqlContext._ | |
val pqspec = sqlContext.parquetFile("spec.pqt") | |
pqspec.registerAsTable("PQSpectra") | |
// get an average image with SparkSQL | |
val avgimg = sql("SELECT x,y,z,AVG(intensity) FROM PQSpectra GROUP BY ((z+y*1000)*1000+x)") | |
val imgarr = avgimg.collect | |
// get an average image normally | |
val ptspec = pqspec.map(_.map(_.asInstanceOf[Double])).map( spt => ((spt(0),spt(1),spt(2)),(spt(3),spt(4))) ) | |
// organize it as an image | |
val imgData = ptspec.groupByKey | |
val nAvgImg = imgData.mapValues(cspec => cspec.map(_._2).sum/cspec.size) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// sorting command for spectra | |
import scala.util.Sorting.stableSort | |
// organize it as an image | |
val imgData = ptspec.groupByKey | |
val imgSpec = imgData.mapValues { | |
rawspec => | |
val outspec = rawspec.toArray | |
stableSort(outspec,(e1: spec_point,e2: spec_point) => e1.wavenumber<e2.wavenumber) | |
outspec.map(_.intensity) | |
} | |
/** | |
* Using MLLib | |
* | |
**/ | |
import org.apache.spark.mllib.clustering.KMeans | |
import org.apache.spark.mllib.linalg.Vectors | |
val imgSV = imgSpec.mapValues{ ptarr => Vectors.dense(ptarr) } | |
val kvData = imgSV.map(_._2).filter(_.size==584).cache | |
// train the model with 4 groups and 1000 iterations | |
val kModel = KMeans.train(kvData,4,1000) | |
// create an image from the clusters | |
val imgClusters = imgSV.mapValues{ cvec => kModel.predict(cvec)} | |
/** | |
* Calculate Principal Components | |
*/ | |
import org.apache.spark.mllib.linalg.{Matrix, Matrices} | |
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} | |
val rm = new RowMatrix(kvData) | |
// pca for first 3 components | |
val prinComp = rm.computePrincipalComponents(3) | |
// calcupate projections | |
val projs: RowMatrix = rm.multiply(pc) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment