Skip to content

Instantly share code, notes, and snippets.

@okanyenigun
Created September 22, 2022 21:23
Show Gist options
  • Save okanyenigun/32140c29181ba4a4ba8da7d1d103fdee to your computer and use it in GitHub Desktop.
Save okanyenigun/32140c29181ba4a4ba8da7d1d103fdee to your computer and use it in GitHub Desktop.
feature transformation and pca scala code
import org.apache.spark.sql.SparkSession
import org.apache.log4j._
Logger.getLogger("org").setLevel(Level.ERROR)
//start a spark session
val spark = SparkSession.builder().getOrCreate()
//read data
val logregdata = spark.read.option("header","true").option("inferSchema","true").format("csv").load("titanic.csv")
val data = logregdata.na.drop()
//feature types
//data.printSchema
//first 20 rows
//data.show
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder, StandardScaler, MinMaxScaler}
//StringIndexer
//val indexer = new StringIndexer().setHandleInvalid("keep").setInputCol("Sex").setOutputCol("SexEnconde")
//val indexed = indexer.fit(data).transform(data)
//val data_encoded = indexed.toDF()
//data2.show()
//data2.select("Sex","SexEnconde").show()
//indexed.show
//data2.select("Sex","SexEncode").show()
//OneHotEncoder
//val encoder = new OneHotEncoder().setInputCol("SexEnconde").setOutputCol("SexVec")
//val model = encoder.fit(data_encoded)
//val encoded = model.transform(data_encoded)
//encoded.toDF().select("SexEnconde","SexVec").show()
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.mllib.linalg._
import org.apache.spark.ml.feature.VectorAssembler
//first convert to double type to VectorUDT type
val schema = new StructType()
.add("features", new VectorUDT())
val toDouble = udf[Double, String]( _.toDouble)
val df2 = data.withColumn("Age", toDouble(data("Age")))
val assembler = new VectorAssembler().
setInputCols(Array("Age")).
setOutputCol("AgeVec")
val out = assembler.transform(df2)
/*
val scaler = new StandardScaler()
.setInputCol("AgeVec")
.setOutputCol("AgeScale")
.setWithStd(true)
.setWithMean(false)
// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(out)
// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(out)
scaledData.select("AgeVec","AgeScale").show()
*/
val scaler = new MinMaxScaler()
.setInputCol("AgeVec")
.setOutputCol("AgeScale")
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(out)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(out)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("AgeVec", "AgeScale").show()
*/
:// Imports
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors
// SparkSession
val spark = SparkSession.builder().appName("PCA_Example").getOrCreate()
// Create some Data
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
// Perform the operation
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val pca = (new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df))
// Transform and check out the results
// Check out the results
val pcaDF = pca.transform(df)
val result = pcaDF.select("pcaFeatures")
result.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment