Created
August 21, 2016 14:58
-
-
Save JRuumis/720fae8b3a421f61689a5202c151d783 to your computer and use it in GitHub Desktop.
every night in my dreams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.log4j.{Level, Logger} | |
import org.apache.spark.ml.classification.RandomForestClassifier | |
import org.apache.spark.ml.feature.{Bucketizer, StringIndexer, VectorAssembler, VectorIndexer} | |
import org.apache.spark.ml.{Pipeline, PipelineStage} | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.{DataFrame, SparkSession} | |
/** | |
* Created by Janis Rumnieks on 19/08/2016. | |
*/ | |
object Titanic { | |
def main(args: Array[String]): Unit = { | |
// helpers | |
def salutationFromName(name: String): String = { | |
val salutationPattern = """, (\w+)\. """.r | |
salutationPattern.findFirstMatchIn(name).map(_ group 1).getOrElse("SalutationUnknown") | |
} | |
val salutationFromNameUDF = udf(salutationFromName _) | |
def cabinNumFromCabin(cabinName: String): Int = { | |
val cabinPattern = """^[A-Z](\d+)( |$)""".r // there can be multiple cabin numbers but we just take the first one | |
cabinPattern.findFirstMatchIn(cabinName).map(_ group 1).getOrElse("0").toInt | |
} | |
val cabinNumFromCabinUDF = udf(cabinNumFromCabin _) | |
def blankToZero(inVal: String): Double = if (inVal == "") 0.0 else inVal.toDouble | |
val blankToZeroUDF = udf(blankToZero _) | |
def blankToNa(inVal: String): String = if (inVal == "") "NA" else inVal | |
val blankToNaUDF = udf(blankToNa _) | |
// Spark Setup | |
println("--- Step 1: set up Spark session") | |
Logger.getLogger("org").setLevel(Level.ERROR) | |
Logger.getLogger("akka").setLevel(Level.ERROR) | |
val spark = SparkSession | |
.builder() | |
.appName("Titanic - every night in my dreams...") | |
.master("local[*]") | |
.config("spark.sql.warehouse.dir", ".") | |
.getOrCreate() | |
import spark.implicits._ | |
// load data frames | |
println("--- Step 2: load DataFrames from csv, do basic transformations") | |
val trainDataFile = """C:\Developer\Kaggle\Titanic\train.csv""" | |
val testDataFile = """C:\Developer\Kaggle\Titanic\test.csv""" | |
val trainDataFrameFromSource: DataFrame = spark.read.option("header",true).csv(trainDataFile) | |
val testDataFrameFromSource = spark.read.option("header",true).csv(testDataFile) | |
// todo: join the two data sets and then split when applying to models!!! | |
val trainDataFrame: DataFrame = trainDataFrameFromSource | |
.withColumn("Salutation", salutationFromNameUDF(trainDataFrameFromSource("Name"))) | |
.withColumn("FareNoBlanks", blankToZeroUDF($"Fare")) | |
.withColumn("AgeNoBlanks", blankToZeroUDF($"Age")) | |
.withColumn("ParentChild", $"Parch".cast("Double")) | |
.withColumn("CabinCode", blankToNaUDF($"Cabin".substr(0,1).cast("String")) ) | |
.withColumn("CabinNum", cabinNumFromCabinUDF($"Cabin").cast("Double") ) | |
.select("PassengerId","Survived","Pclass","Name","Salutation","Sex","Age","AgeNoBlanks","SibSp","Parch","ParentChild","Ticket","Fare","FareNoBlanks","Cabin","CabinCode","CabinNum","Embarked") | |
println("train DataFrame:") | |
trainDataFrame.show(10,false) | |
val testDataFrame: DataFrame = testDataFrameFromSource | |
.withColumn("Survived", testDataFrameFromSource("PassengerId").cast("Int")-testDataFrameFromSource("PassengerId").cast("Int") ) | |
.withColumn("Salutation", salutationFromNameUDF(testDataFrameFromSource("Name"))) | |
.withColumn("FareNoBlanks", blankToZeroUDF($"Fare")) | |
.withColumn("AgeNoBlanks", blankToZeroUDF($"Age")) | |
.withColumn("ParentChild", $"Parch".cast("Double")) | |
.withColumn("CabinCode", blankToNaUDF($"Cabin".substr(0,1).cast("String")) ) | |
.withColumn("CabinNum", cabinNumFromCabinUDF($"Cabin").cast("Double") ) | |
.select("PassengerId","Survived","Pclass","Name","Salutation","Sex","Age","AgeNoBlanks","SibSp","Parch","ParentChild","Ticket","Fare","FareNoBlanks","Cabin","CabinCode","CabinNum","Embarked") | |
println("test DataFrame:") | |
testDataFrame.show(10,false) | |
/// exploratory queries (for buckets mostly) | |
//println("All ages:") | |
//(trainDataFrame.select("Age") union testDataFrame.select("Age")).distinct().orderBy("Age").show(300) | |
//println("All fares:") | |
//(trainDataFrame.select("FareRounded") union testDataFrame.select("FareRounded")).distinct().orderBy("FareRounded").show(300) | |
//println("Parch (train/test): ") | |
//(trainDataFrame.select("Parch")).distinct().orderBy("Parch").show(300) | |
//(testDataFrame.select("Parch")).distinct().orderBy("Parch").show(300) | |
//println("cabin numbers:") | |
//(trainDataFrame.select("CabinNum") union testDataFrame.select("CabinNum")).distinct().orderBy("CabinNum").show(300) | |
//println("tickets:") | |
//(trainDataFrame.select("CabinNum") union testDataFrame.select("CabinNum")).distinct().orderBy("CabinNum").show(300) | |
// bucketizers | |
val ageBuckets = Array(Double.NegativeInfinity, 0.0, 5.0, 10.0, 15.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, Double.PositiveInfinity) | |
val fareBuckets = Array(Double.NegativeInfinity, 0.0, 5.0, 10.0, 15.0, 20.0, 30.0, 50.0, 75.0, 100.0, 150.0, 200.0, 250.0, 500.0, Double.PositiveInfinity) | |
// worse: | |
//val ageBuckets = Array(Double.NegativeInfinity, 0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, Double.PositiveInfinity) | |
//val fareBuckets = Array(Double.NegativeInfinity, 0.0, 10.0, 20.0, 30.0, 50.0, 100.0, 150.0, 200.0, Double.PositiveInfinity) | |
val parentChildBuckets = Array(Double.NegativeInfinity, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, Double.PositiveInfinity) // I should not have to do this, but Parch, though already bucketed, crashes the collect() for the estimate data set | |
val cabinNumberBuckets = Array(Double.NegativeInfinity, 0.0, 1.0, 10.0, 30.0, 60.0, 90.0, 120.0, Double.PositiveInfinity) | |
val ageBucketizer: Bucketizer = new Bucketizer().setInputCol("AgeNoBlanks").setOutputCol("AgeBucket").setSplits(ageBuckets) | |
val fareBucketizer: Bucketizer = new Bucketizer().setInputCol("FareNoBlanks").setOutputCol("FareBucket").setSplits(fareBuckets) | |
val parentChildBucketizer: Bucketizer = new Bucketizer().setInputCol("ParentChild").setOutputCol("ParentChildBucket").setSplits(parentChildBuckets) | |
val cabinNumberBucketizer: Bucketizer = new Bucketizer().setInputCol("CabinNum").setOutputCol("CabinNumBucket").setSplits(cabinNumberBuckets) | |
val featureBucketizers = Array(ageBucketizer,fareBucketizer,parentChildBucketizer,cabinNumberBucketizer) | |
// data frame indexers | |
println("--- Step 3: index DataFrame features") | |
val indexableFeatureNames = Array("Survived","Pclass","Salutation","Sex","AgeBucket","SibSp","ParentChildBucket","FareBucket","Embarked" /*,"CabinNumBucket","CabinCode"*/) // Parch needs to be excluded, else RandomForestClassifier fails | |
println(s"features to be indexed: ${indexableFeatureNames mkString(",")}") | |
val featureIndexers: Array[StringIndexer] = indexableFeatureNames map { featureName => new StringIndexer().setInputCol(featureName).setOutputCol(featureName + "Indexed") } | |
// data frame transformation | |
val allFeatureTransformers: Array[PipelineStage] = featureBucketizers ++ featureIndexers // note that order is important! bucketizers before indexers | |
val featureIndexerPipeline = new Pipeline().setStages(allFeatureTransformers) | |
//val featureIndexerPipeline = new Pipeline().setStages(featureBucketizers) | |
val trainDataFrameIndexModel = featureIndexerPipeline.fit(trainDataFrame) | |
val trainDataFrameIndexed = trainDataFrameIndexModel.transform(trainDataFrame) | |
println("indexed train DataFrame:") | |
trainDataFrameIndexed.show(10,false) | |
val testDataFrameIndexModel = featureIndexerPipeline.fit(testDataFrame) | |
val testDataFrameIndexed = testDataFrameIndexModel.transform(testDataFrame) | |
println("indexed test DataFrame:") | |
testDataFrameIndexed.show(10,false) | |
// create vectors for indexed features | |
println("--- Step 4: create indexed feature vectors") | |
val vectorInputCols = indexableFeatureNames.tail map (featureName => featureName + "Indexed") // .tail is to exclude Survived | |
println(s"""indexed feature names for vector: ${vectorInputCols mkString(",")}""") | |
val titanicFeatureAssembler = new VectorAssembler() | |
.setInputCols(vectorInputCols) | |
.setOutputCol("features") | |
val trainDataFrameVectored = titanicFeatureAssembler.transform(trainDataFrameIndexed) | |
val testDataFrameVectored = titanicFeatureAssembler.transform(testDataFrameIndexed) | |
// index feature vector and apply Random Forest | |
println("--- Step 5: vector indexing and Random Forest") | |
val titanicFeatureIndexer = new VectorIndexer() | |
.setInputCol("features") | |
.setOutputCol("indexedFeatures") | |
.setMaxCategories(100) | |
.fit(trainDataFrameVectored) | |
val titanicRandomForestClassifier = new RandomForestClassifier() | |
.setLabelCol("SurvivedIndexed") | |
.setFeaturesCol("indexedFeatures") | |
.setNumTrees(10000) | |
//.setMaxBins(350) | |
val titanicPipeline = new Pipeline() | |
.setStages(Array(titanicFeatureIndexer, titanicRandomForestClassifier)) | |
val titanicModel = titanicPipeline.fit(trainDataFrameVectored) | |
val titanicPredictions = titanicModel.transform(testDataFrameVectored) | |
println(s"VOLUMES: trainDataFrame: ${trainDataFrame.count()} testDataFrame: ${testDataFrame.count()} testDataFrameIndexed: ${testDataFrameIndexed.count()} testDataFrameVectored: ${testDataFrameVectored.count()}, titanicPredictions: ${titanicPredictions.count()}") | |
titanicPredictions.show(10,false) | |
titanicPredictions.select("PassengerId","prediction").show(20) | |
val survivors = titanicPredictions.select("PassengerId","prediction").collect() | |
//val survivors = titanicPredictions.collect() | |
// write to file | |
println("--- Step 6: write to file") | |
import java.io._ | |
val pw = new PrintWriter(new File("""C:\Developer\Kaggle\Titanic\janis_output.csv""")) | |
pw.write("PassengerId,Survived\n") | |
survivors foreach ( row => pw.write(s"${row.getString(0)},${row.getDouble(1).toInt}\n") ) | |
pw.close | |
println("Done!") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment