Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@JRuumis
Created August 21, 2016 14:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JRuumis/720fae8b3a421f61689a5202c151d783 to your computer and use it in GitHub Desktop.
Save JRuumis/720fae8b3a421f61689a5202c151d783 to your computer and use it in GitHub Desktop.
every night in my dreams
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.feature.{Bucketizer, StringIndexer, VectorAssembler, VectorIndexer}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Created by Janis Rumnieks on 19/08/2016.
*/
object Titanic {
def main(args: Array[String]): Unit = {
// helpers
def salutationFromName(name: String): String = {
val salutationPattern = """, (\w+)\. """.r
salutationPattern.findFirstMatchIn(name).map(_ group 1).getOrElse("SalutationUnknown")
}
val salutationFromNameUDF = udf(salutationFromName _)
def cabinNumFromCabin(cabinName: String): Int = {
val cabinPattern = """^[A-Z](\d+)( |$)""".r // there can be multiple cabin numbers but we just take the first one
cabinPattern.findFirstMatchIn(cabinName).map(_ group 1).getOrElse("0").toInt
}
val cabinNumFromCabinUDF = udf(cabinNumFromCabin _)
def blankToZero(inVal: String): Double = if (inVal == "") 0.0 else inVal.toDouble
val blankToZeroUDF = udf(blankToZero _)
def blankToNa(inVal: String): String = if (inVal == "") "NA" else inVal
val blankToNaUDF = udf(blankToNa _)
// Spark Setup
println("--- Step 1: set up Spark session")
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.appName("Titanic - every night in my dreams...")
.master("local[*]")
.config("spark.sql.warehouse.dir", ".")
.getOrCreate()
import spark.implicits._
// load data frames
println("--- Step 2: load DataFrames from csv, do basic transformations")
val trainDataFile = """C:\Developer\Kaggle\Titanic\train.csv"""
val testDataFile = """C:\Developer\Kaggle\Titanic\test.csv"""
val trainDataFrameFromSource: DataFrame = spark.read.option("header",true).csv(trainDataFile)
val testDataFrameFromSource = spark.read.option("header",true).csv(testDataFile)
// todo: join the two data sets and then split when applying to models!!!
val trainDataFrame: DataFrame = trainDataFrameFromSource
.withColumn("Salutation", salutationFromNameUDF(trainDataFrameFromSource("Name")))
.withColumn("FareNoBlanks", blankToZeroUDF($"Fare"))
.withColumn("AgeNoBlanks", blankToZeroUDF($"Age"))
.withColumn("ParentChild", $"Parch".cast("Double"))
.withColumn("CabinCode", blankToNaUDF($"Cabin".substr(0,1).cast("String")) )
.withColumn("CabinNum", cabinNumFromCabinUDF($"Cabin").cast("Double") )
.select("PassengerId","Survived","Pclass","Name","Salutation","Sex","Age","AgeNoBlanks","SibSp","Parch","ParentChild","Ticket","Fare","FareNoBlanks","Cabin","CabinCode","CabinNum","Embarked")
println("train DataFrame:")
trainDataFrame.show(10,false)
val testDataFrame: DataFrame = testDataFrameFromSource
.withColumn("Survived", testDataFrameFromSource("PassengerId").cast("Int")-testDataFrameFromSource("PassengerId").cast("Int") )
.withColumn("Salutation", salutationFromNameUDF(testDataFrameFromSource("Name")))
.withColumn("FareNoBlanks", blankToZeroUDF($"Fare"))
.withColumn("AgeNoBlanks", blankToZeroUDF($"Age"))
.withColumn("ParentChild", $"Parch".cast("Double"))
.withColumn("CabinCode", blankToNaUDF($"Cabin".substr(0,1).cast("String")) )
.withColumn("CabinNum", cabinNumFromCabinUDF($"Cabin").cast("Double") )
.select("PassengerId","Survived","Pclass","Name","Salutation","Sex","Age","AgeNoBlanks","SibSp","Parch","ParentChild","Ticket","Fare","FareNoBlanks","Cabin","CabinCode","CabinNum","Embarked")
println("test DataFrame:")
testDataFrame.show(10,false)
/// exploratory queries (for buckets mostly)
//println("All ages:")
//(trainDataFrame.select("Age") union testDataFrame.select("Age")).distinct().orderBy("Age").show(300)
//println("All fares:")
//(trainDataFrame.select("FareRounded") union testDataFrame.select("FareRounded")).distinct().orderBy("FareRounded").show(300)
//println("Parch (train/test): ")
//(trainDataFrame.select("Parch")).distinct().orderBy("Parch").show(300)
//(testDataFrame.select("Parch")).distinct().orderBy("Parch").show(300)
//println("cabin numbers:")
//(trainDataFrame.select("CabinNum") union testDataFrame.select("CabinNum")).distinct().orderBy("CabinNum").show(300)
//println("tickets:")
//(trainDataFrame.select("CabinNum") union testDataFrame.select("CabinNum")).distinct().orderBy("CabinNum").show(300)
// bucketizers
val ageBuckets = Array(Double.NegativeInfinity, 0.0, 5.0, 10.0, 15.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, Double.PositiveInfinity)
val fareBuckets = Array(Double.NegativeInfinity, 0.0, 5.0, 10.0, 15.0, 20.0, 30.0, 50.0, 75.0, 100.0, 150.0, 200.0, 250.0, 500.0, Double.PositiveInfinity)
// worse:
//val ageBuckets = Array(Double.NegativeInfinity, 0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, Double.PositiveInfinity)
//val fareBuckets = Array(Double.NegativeInfinity, 0.0, 10.0, 20.0, 30.0, 50.0, 100.0, 150.0, 200.0, Double.PositiveInfinity)
val parentChildBuckets = Array(Double.NegativeInfinity, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, Double.PositiveInfinity) // I should not have to do this, but Parch, though already bucketed, crashes the collect() for the estimate data set
val cabinNumberBuckets = Array(Double.NegativeInfinity, 0.0, 1.0, 10.0, 30.0, 60.0, 90.0, 120.0, Double.PositiveInfinity)
val ageBucketizer: Bucketizer = new Bucketizer().setInputCol("AgeNoBlanks").setOutputCol("AgeBucket").setSplits(ageBuckets)
val fareBucketizer: Bucketizer = new Bucketizer().setInputCol("FareNoBlanks").setOutputCol("FareBucket").setSplits(fareBuckets)
val parentChildBucketizer: Bucketizer = new Bucketizer().setInputCol("ParentChild").setOutputCol("ParentChildBucket").setSplits(parentChildBuckets)
val cabinNumberBucketizer: Bucketizer = new Bucketizer().setInputCol("CabinNum").setOutputCol("CabinNumBucket").setSplits(cabinNumberBuckets)
val featureBucketizers = Array(ageBucketizer,fareBucketizer,parentChildBucketizer,cabinNumberBucketizer)
// data frame indexers
println("--- Step 3: index DataFrame features")
val indexableFeatureNames = Array("Survived","Pclass","Salutation","Sex","AgeBucket","SibSp","ParentChildBucket","FareBucket","Embarked" /*,"CabinNumBucket","CabinCode"*/) // Parch needs to be excluded, else RandomForestClassifier fails
println(s"features to be indexed: ${indexableFeatureNames mkString(",")}")
val featureIndexers: Array[StringIndexer] = indexableFeatureNames map { featureName => new StringIndexer().setInputCol(featureName).setOutputCol(featureName + "Indexed") }
// data frame transformation
val allFeatureTransformers: Array[PipelineStage] = featureBucketizers ++ featureIndexers // note that order is important! bucketizers before indexers
val featureIndexerPipeline = new Pipeline().setStages(allFeatureTransformers)
//val featureIndexerPipeline = new Pipeline().setStages(featureBucketizers)
val trainDataFrameIndexModel = featureIndexerPipeline.fit(trainDataFrame)
val trainDataFrameIndexed = trainDataFrameIndexModel.transform(trainDataFrame)
println("indexed train DataFrame:")
trainDataFrameIndexed.show(10,false)
val testDataFrameIndexModel = featureIndexerPipeline.fit(testDataFrame)
val testDataFrameIndexed = testDataFrameIndexModel.transform(testDataFrame)
println("indexed test DataFrame:")
testDataFrameIndexed.show(10,false)
// create vectors for indexed features
println("--- Step 4: create indexed feature vectors")
val vectorInputCols = indexableFeatureNames.tail map (featureName => featureName + "Indexed") // .tail is to exclude Survived
println(s"""indexed feature names for vector: ${vectorInputCols mkString(",")}""")
val titanicFeatureAssembler = new VectorAssembler()
.setInputCols(vectorInputCols)
.setOutputCol("features")
val trainDataFrameVectored = titanicFeatureAssembler.transform(trainDataFrameIndexed)
val testDataFrameVectored = titanicFeatureAssembler.transform(testDataFrameIndexed)
// index feature vector and apply Random Forest
println("--- Step 5: vector indexing and Random Forest")
val titanicFeatureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(100)
.fit(trainDataFrameVectored)
val titanicRandomForestClassifier = new RandomForestClassifier()
.setLabelCol("SurvivedIndexed")
.setFeaturesCol("indexedFeatures")
.setNumTrees(10000)
//.setMaxBins(350)
val titanicPipeline = new Pipeline()
.setStages(Array(titanicFeatureIndexer, titanicRandomForestClassifier))
val titanicModel = titanicPipeline.fit(trainDataFrameVectored)
val titanicPredictions = titanicModel.transform(testDataFrameVectored)
println(s"VOLUMES: trainDataFrame: ${trainDataFrame.count()} testDataFrame: ${testDataFrame.count()} testDataFrameIndexed: ${testDataFrameIndexed.count()} testDataFrameVectored: ${testDataFrameVectored.count()}, titanicPredictions: ${titanicPredictions.count()}")
titanicPredictions.show(10,false)
titanicPredictions.select("PassengerId","prediction").show(20)
val survivors = titanicPredictions.select("PassengerId","prediction").collect()
//val survivors = titanicPredictions.collect()
// write to file
println("--- Step 6: write to file")
import java.io._
val pw = new PrintWriter(new File("""C:\Developer\Kaggle\Titanic\janis_output.csv"""))
pw.write("PassengerId,Survived\n")
survivors foreach ( row => pw.write(s"${row.getString(0)},${row.getDouble(1).toInt}\n") )
pw.close
println("Done!")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment