Last active
December 20, 2017 02:22
-
-
Save vchilkuri/1da6823d019c8d134745da1a93f54d2c to your computer and use it in GitHub Desktop.
Evaluating the flight delay of a particular year using data from various influencing sources of previous year and perform Decision tree, Logistic regression and Linear SVM for modelling.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.rdd._ | |
import scala.collection.JavaConverters._ | |
import au.com.bytecode.opencsv.CSVReader | |
import java.io._ | |
import org.joda.time.DateTime; | |
import org.joda.time.format.DateTimeFormat; | |
import java.util.Calendar; | |
import java.util.Date; | |
case class DelayRec(Year: String, | |
Month: String, | |
DayOfMonth: String, | |
DayOfWeek: String, | |
CRSDepTime: String, | |
DepDelay: String, | |
Origin: String, | |
Distance: String, | |
Cancelled: String) { | |
val holidays = List("01/01/2015", "01/15/2015", "02/19/2015", "05/28/2015", "06/07/2015", "07/04/2015", | |
"09/03/2015", "10/08/2015" ,"11/11/2015", "11/22/2015", "12/25/2015", | |
"01/01/2016", "01/21/2016", "02/18/2016", "05/22/2016", "05/26/2016", "07/04/2016", | |
"09/01/2016", "10/13/2016" ,"11/11/2016", "11/27/2016", "12/25/2016") | |
def gen_features: (String, Array[Double]) = { | |
val values = Array( | |
DepDelay.toDouble, | |
Month.toDouble, | |
DayOfMonth.toDouble, | |
DayOfWeek.toDouble, | |
get_hour(CRSDepTime).toDouble, | |
Distance.toDouble, | |
days_from_nearest_holiday(Year.toInt, Month.toInt, DayOfMonth.toInt) | |
) | |
new Tuple2(to_date(Year.toInt, Month.toInt, DayOfMonth.toInt), values) | |
} | |
def get_hour(DepTime: String) : String = "%04d".format(DepTime.toInt).take(2) | |
def to_date(Year: Int, Month: Int, Day: Int) = "%04d%02d%02d".format(Year, Month, Day) | |
def days_from_nearest_holiday(Year:Int, Month:Int, Day:Int): Int = { | |
val sampleDate = new org.joda.time.DateTime(Year, Month, Day, 0, 0,0,0) | |
holidays.foldLeft(3000) { (r, c) => | |
val holiday = org.joda.time.format.DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c) | |
val Distance = Math.abs(org.joda.time.Days.daysBetween(holiday, sampleDate).getDays) | |
math.min(r, Distance) | |
} | |
} | |
} | |
// function to do a preprocessing step for a given file | |
def prepFlightDelays(infile: String): RDD[DelayRec] = { | |
val data = sc.textFile(infile) | |
data.map { line => | |
val reader = new CSVReader(new StringReader(line)) | |
reader.readAll().asScala.toList.map(rec => DelayRec(rec(0),rec(1),rec(2),rec(3),rec(5),rec(15),rec(16),rec(18),rec(21))) | |
}.map(list => list(0)) | |
.filter(rec => rec.Year != "Year") | |
.filter(rec => rec.Cancelled == "0") | |
.filter(rec => rec.Origin == "ORD") | |
} | |
val data_2015 = prepFlightDelays("alpha.csv").map(rec => rec.gen_features._2) | |
val data_2016 = prepFlightDelays("beta.csv").map(rec => rec.gen_features._2) | |
data_2015.take(5).map(x => x mkString ",").foreach(println) | |
__________________________________________________________________________________________________________ | |
import org.apache.spark.mllib.regression.LabeledPoint | |
import org.apache.spark.mllib.linalg.Vectors | |
import org.apache.spark.mllib.feature.StandardScaler | |
def parseData(vals: Array[Double]): LabeledPoint = { | |
LabeledPoint(if (vals(0)>=15) 1.0 else 0.0, Vectors.dense(vals.drop(1))) | |
} | |
// Prepare training set | |
val parsedTrainData = data_2015.map(parseData) | |
parsedTrainData.cache | |
val scaler = new StandardScaler(withMean = true, withStd = true).fit(parsedTrainData.map(x => x.features)) | |
val scaledTrainData = parsedTrainData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray)))) | |
scaledTrainData.cache | |
// Prepare test/validation set | |
val parsedTestData = data_2016.map(parseData) | |
parsedTestData.cache | |
val scaledTestData = parsedTestData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray)))) | |
scaledTestData.cache | |
scaledTrainData.take(3).map(x => (x.label, x.features)).foreach(println) | |
___________________________________________________________________________________________________________________ | |
// Function to compute evaluation metrics | |
def eval_metrics(labelsAndPreds: RDD[(Double, Double)]) : Tuple2[Array[Double], Array[Double]] = { | |
val tp = labelsAndPreds.filter(r => r._1==1 && r._2==1).count.toDouble | |
val tn = labelsAndPreds.filter(r => r._1==0 && r._2==0).count.toDouble | |
val fp = labelsAndPreds.filter(r => r._1==1 && r._2==0).count.toDouble | |
val fn = labelsAndPreds.filter(r => r._1==0 && r._2==1).count.toDouble | |
val precision = tp / (tp+fp) | |
val recall = tp / (tp+fn) | |
val F_measure = 2*precision*recall / (precision+recall) | |
val accuracy = (tp+tn) / (tp+tn+fp+fn) | |
new Tuple2(Array(tp, tn, fp, fn), Array(precision, recall, F_measure, accuracy)) | |
} | |
__________________________________________________________________________________________________________________ | |
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD | |
// Build the Logistic Regression model | |
val model_lr = LogisticRegressionWithSGD.train(scaledTrainData, numIterations=100) | |
// Predict | |
val labelsAndPreds_lr = scaledTestData.map { point => | |
val pred = model_lr.predict(point.features) | |
(pred, point.label) | |
} | |
val m_lr = eval_metrics(labelsAndPreds_lr)._2 | |
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_lr(0), m_lr(1), m_lr(2), m_lr(3))) | |
_______________________________________________________________________________________________________________ | |
import org.apache.spark.mllib.classification.SVMWithSGD | |
// Build the SVM model | |
val svmAlg = new SVMWithSGD() | |
svmAlg.optimizer.setNumIterations(100) | |
.setRegParam(1.0) | |
.setStepSize(1.0) | |
val model_svm = svmAlg.run(scaledTrainData) | |
// Predict | |
val labelsAndPreds_svm = scaledTestData.map { point => | |
val pred = model_svm.predict(point.features) | |
(pred, point.label) | |
} | |
val m_svm = eval_metrics(labelsAndPreds_svm)._2 | |
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_svm(0), m_svm(1), m_svm(2), m_svm(3))) | |
__________________________________________________________________________________________________________________________ | |
import org.apache.spark.mllib.tree.DecisionTree | |
// Build the Decision Tree model | |
val numClasses = 2 | |
val categoricalFeaturesInfo = Map[Int, Int]() | |
val impurity = "gini" | |
val maxDepth = 10 | |
val maxBins = 100 | |
val model_dt = DecisionTree.trainClassifier(parsedTrainData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins) | |
// Predict | |
val labelsAndPreds_dt = parsedTestData.map { point => | |
val pred = model_dt.predict(point.features) | |
(pred, point.label) | |
} | |
val m_dt = eval_metrics(labelsAndPreds_dt)._2 | |
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_dt(0), m_dt(1), m_dt(2), m_dt(3))) | |
________________________________________________________________________________________________________________________ | |
//WIth weather data | |
import org.apache.spark.SparkContext._ | |
import scala.collection.JavaConverters._ | |
import au.com.bytecode.opencsv.CSVReader | |
import java.io._ | |
// function to do a preprocessing step for a given file | |
def preprocess_spark(delay_file: String, weather_file: String): RDD[Array[Double]] = { | |
// Read wether data | |
val delayRecs = prepFlightDelays(delay_file).map{ rec => | |
val features = rec.gen_features | |
(features._1, features._2) | |
} | |
// Read weather data into RDDs | |
val station_inx = 0 | |
val date_inx = 1 | |
val metric_inx = 2 | |
val value_inx = 3 | |
def filterMap(wdata:RDD[Array[String]], metric:String):RDD[(String,Double)] = { | |
wdata.filter(vals => vals(metric_inx) == metric).map(vals => (vals(date_inx), vals(value_inx).toDouble)) | |
} | |
val wdata = sc.textFile(weather_file).map(line => line.split(",")) | |
.filter(vals => vals(station_inx) == "USW00094846") | |
val w_tmin = filterMap(wdata,"TMIN") | |
val w_tmax = filterMap(wdata,"TMAX") | |
val w_prcp = filterMap(wdata,"PRCP") | |
val w_snow = filterMap(wdata,"SNOW") | |
val w_awnd = filterMap(wdata,"AWND") | |
delayRecs.join(w_tmin).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2))) | |
.join(w_tmax).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2))) | |
.join(w_prcp).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2))) | |
.join(w_snow).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2))) | |
.join(w_awnd).map(vals => vals._2._1 ++ Array(vals._2._2)) | |
} | |
val data_2015 = preprocess_spark("alpha.csv", "2015.csv") | |
val data_2016 = preprocess_spark("beta.csv", "2016.csv") | |
data_2015.take(5).map(x => x mkString ",").foreach(println) | |
________________________________________________________________________________________________ | |
import org.apache.spark.mllib.regression.LabeledPoint | |
import org.apache.spark.mllib.linalg.Vectors | |
import org.apache.spark.mllib.feature.StandardScaler | |
def parseData(vals: Array[Double]): LabeledPoint = { | |
LabeledPoint(if (vals(0)>=15) 1.0 else 0.0, Vectors.dense(vals.drop(1))) | |
} | |
// Prepare training set | |
val parsedTrainData = data_2015.map(parseData) | |
val scaler = new StandardScaler(withMean = true, withStd = true).fit(parsedTrainData.map(x => x.features)) | |
val scaledTrainData = parsedTrainData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray)))) | |
parsedTrainData.cache | |
scaledTrainData.cache | |
// Prepare test/validation set | |
val parsedTestData = data_2016.map(parseData) | |
val scaledTestData = parsedTestData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray)))) | |
parsedTestData.cache | |
scaledTestData.cache | |
scaledTrainData.take(3).map(x => (x.label, x.features)).foreach(println) | |
_________________________________________________________________________________________________________________________ | |
import org.apache.spark.mllib.classification.SVMWithSGD | |
import org.apache.spark.mllib.optimization.L1Updater | |
// Build the SVM model | |
val svmAlg = new SVMWithSGD() | |
svmAlg.optimizer.setNumIterations(100) | |
.setRegParam(1.0) | |
.setStepSize(1.0) | |
val model_svm = svmAlg.run(scaledTrainData) | |
// Predict | |
val labelsAndPreds_svm = scaledTestData.map { point => | |
val pred = model_svm.predict(point.features) | |
(pred, point.label) | |
} | |
val m_svm = eval_metrics(labelsAndPreds_svm)._2 | |
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_svm(0), m_svm(1), m_svm(2), m_svm(3))) | |
___________________________________________________________________________________________________________________________ | |
import org.apache.spark.mllib.tree.DecisionTree | |
// Build the Decision Tree model | |
val numClasses = 2 | |
val categoricalFeaturesInfo = Map[Int, Int]() | |
val impurity = "gini" | |
val maxDepth = 10 | |
val maxBins = 100 | |
val model_dt = DecisionTree.trainClassifier(scaledTrainData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins) | |
// Predict | |
val labelsAndPreds_dt = scaledTestData.map { point => | |
val pred = model_dt.predict(point.features) | |
(pred, point.label) | |
} | |
val m_dt = eval_metrics(labelsAndPreds_dt)._2 | |
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_dt(0), m_dt(1), m_dt(2), m_dt(3))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment