Created
September 22, 2022 21:23
-
-
Save okanyenigun/32140c29181ba4a4ba8da7d1d103fdee to your computer and use it in GitHub Desktop.
feature transformation and pca scala code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.SparkSession | |
import org.apache.log4j._ | |
Logger.getLogger("org").setLevel(Level.ERROR) | |
//start a spark session | |
val spark = SparkSession.builder().getOrCreate() | |
//read data | |
val logregdata = spark.read.option("header","true").option("inferSchema","true").format("csv").load("titanic.csv") | |
val data = logregdata.na.drop() | |
//feature types | |
//data.printSchema | |
//first 20 rows | |
//data.show | |
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder, StandardScaler, MinMaxScaler} | |
//StringIndexer | |
//val indexer = new StringIndexer().setHandleInvalid("keep").setInputCol("Sex").setOutputCol("SexEnconde") | |
//val indexed = indexer.fit(data).transform(data) | |
//val data_encoded = indexed.toDF() | |
//data2.show() | |
//data2.select("Sex","SexEnconde").show() | |
//indexed.show | |
//data2.select("Sex","SexEncode").show() | |
//OneHotEncoder | |
//val encoder = new OneHotEncoder().setInputCol("SexEnconde").setOutputCol("SexVec") | |
//val model = encoder.fit(data_encoded) | |
//val encoded = model.transform(data_encoded) | |
//encoded.toDF().select("SexEnconde","SexVec").show() | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.mllib.linalg._ | |
import org.apache.spark.ml.feature.VectorAssembler | |
//first convert to double type to VectorUDT type | |
val schema = new StructType() | |
.add("features", new VectorUDT()) | |
val toDouble = udf[Double, String]( _.toDouble) | |
val df2 = data.withColumn("Age", toDouble(data("Age"))) | |
val assembler = new VectorAssembler(). | |
setInputCols(Array("Age")). | |
setOutputCol("AgeVec") | |
val out = assembler.transform(df2) | |
/* | |
val scaler = new StandardScaler() | |
.setInputCol("AgeVec") | |
.setOutputCol("AgeScale") | |
.setWithStd(true) | |
.setWithMean(false) | |
// Compute summary statistics by fitting the StandardScaler. | |
val scalerModel = scaler.fit(out) | |
// Normalize each feature to have unit standard deviation. | |
val scaledData = scalerModel.transform(out) | |
scaledData.select("AgeVec","AgeScale").show() | |
*/ | |
val scaler = new MinMaxScaler() | |
.setInputCol("AgeVec") | |
.setOutputCol("AgeScale") | |
// Compute summary statistics and generate MinMaxScalerModel | |
val scalerModel = scaler.fit(out) | |
// rescale each feature to range [min, max]. | |
val scaledData = scalerModel.transform(out) | |
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]") | |
scaledData.select("AgeVec", "AgeScale").show() | |
*/ | |
:// Imports | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.ml.feature.PCA | |
import org.apache.spark.ml.linalg.Vectors | |
// SparkSession | |
val spark = SparkSession.builder().appName("PCA_Example").getOrCreate() | |
// Create some Data | |
val data = Array( | |
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))), | |
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0), | |
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0) | |
) | |
// Perform the operation | |
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") | |
val pca = (new PCA() | |
.setInputCol("features") | |
.setOutputCol("pcaFeatures") | |
.setK(3) | |
.fit(df)) | |
// Transform and check out the results | |
// Check out the results | |
val pcaDF = pca.transform(df) | |
val result = pcaDF.select("pcaFeatures") | |
result.show() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment