Skip to content

Instantly share code, notes, and snippets.

@vchilkuri
Last active December 20, 2017 02:22
Show Gist options
  • Save vchilkuri/1da6823d019c8d134745da1a93f54d2c to your computer and use it in GitHub Desktop.
Save vchilkuri/1da6823d019c8d134745da1a93f54d2c to your computer and use it in GitHub Desktop.
Evaluating the flight delay of a particular year using data from various influencing sources of previous year and perform Decision tree, Logistic regression and Linear SVM for modelling.
import org.apache.spark.rdd._
import scala.collection.JavaConverters._
import au.com.bytecode.opencsv.CSVReader
import java.io._
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import java.util.Calendar;
import java.util.Date;
case class DelayRec(Year: String,
Month: String,
DayOfMonth: String,
DayOfWeek: String,
CRSDepTime: String,
DepDelay: String,
Origin: String,
Distance: String,
Cancelled: String) {
val holidays = List("01/01/2015", "01/15/2015", "02/19/2015", "05/28/2015", "06/07/2015", "07/04/2015",
"09/03/2015", "10/08/2015" ,"11/11/2015", "11/22/2015", "12/25/2015",
"01/01/2016", "01/21/2016", "02/18/2016", "05/22/2016", "05/26/2016", "07/04/2016",
"09/01/2016", "10/13/2016" ,"11/11/2016", "11/27/2016", "12/25/2016")
def gen_features: (String, Array[Double]) = {
val values = Array(
DepDelay.toDouble,
Month.toDouble,
DayOfMonth.toDouble,
DayOfWeek.toDouble,
get_hour(CRSDepTime).toDouble,
Distance.toDouble,
days_from_nearest_holiday(Year.toInt, Month.toInt, DayOfMonth.toInt)
)
new Tuple2(to_date(Year.toInt, Month.toInt, DayOfMonth.toInt), values)
}
def get_hour(DepTime: String) : String = "%04d".format(DepTime.toInt).take(2)
def to_date(Year: Int, Month: Int, Day: Int) = "%04d%02d%02d".format(Year, Month, Day)
def days_from_nearest_holiday(Year:Int, Month:Int, Day:Int): Int = {
val sampleDate = new org.joda.time.DateTime(Year, Month, Day, 0, 0,0,0)
holidays.foldLeft(3000) { (r, c) =>
val holiday = org.joda.time.format.DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c)
val Distance = Math.abs(org.joda.time.Days.daysBetween(holiday, sampleDate).getDays)
math.min(r, Distance)
}
}
}
// function to do a preprocessing step for a given file
def prepFlightDelays(infile: String): RDD[DelayRec] = {
val data = sc.textFile(infile)
data.map { line =>
val reader = new CSVReader(new StringReader(line))
reader.readAll().asScala.toList.map(rec => DelayRec(rec(0),rec(1),rec(2),rec(3),rec(5),rec(15),rec(16),rec(18),rec(21)))
}.map(list => list(0))
.filter(rec => rec.Year != "Year")
.filter(rec => rec.Cancelled == "0")
.filter(rec => rec.Origin == "ORD")
}
val data_2015 = prepFlightDelays("alpha.csv").map(rec => rec.gen_features._2)
val data_2016 = prepFlightDelays("beta.csv").map(rec => rec.gen_features._2)
data_2015.take(5).map(x => x mkString ",").foreach(println)
__________________________________________________________________________________________________________
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.feature.StandardScaler
def parseData(vals: Array[Double]): LabeledPoint = {
LabeledPoint(if (vals(0)>=15) 1.0 else 0.0, Vectors.dense(vals.drop(1)))
}
// Prepare training set
val parsedTrainData = data_2015.map(parseData)
parsedTrainData.cache
val scaler = new StandardScaler(withMean = true, withStd = true).fit(parsedTrainData.map(x => x.features))
val scaledTrainData = parsedTrainData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))
scaledTrainData.cache
// Prepare test/validation set
val parsedTestData = data_2016.map(parseData)
parsedTestData.cache
val scaledTestData = parsedTestData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))
scaledTestData.cache
scaledTrainData.take(3).map(x => (x.label, x.features)).foreach(println)
___________________________________________________________________________________________________________________
// Function to compute evaluation metrics
def eval_metrics(labelsAndPreds: RDD[(Double, Double)]) : Tuple2[Array[Double], Array[Double]] = {
val tp = labelsAndPreds.filter(r => r._1==1 && r._2==1).count.toDouble
val tn = labelsAndPreds.filter(r => r._1==0 && r._2==0).count.toDouble
val fp = labelsAndPreds.filter(r => r._1==1 && r._2==0).count.toDouble
val fn = labelsAndPreds.filter(r => r._1==0 && r._2==1).count.toDouble
val precision = tp / (tp+fp)
val recall = tp / (tp+fn)
val F_measure = 2*precision*recall / (precision+recall)
val accuracy = (tp+tn) / (tp+tn+fp+fn)
new Tuple2(Array(tp, tn, fp, fn), Array(precision, recall, F_measure, accuracy))
}
__________________________________________________________________________________________________________________
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
// Build the Logistic Regression model
val model_lr = LogisticRegressionWithSGD.train(scaledTrainData, numIterations=100)
// Predict
val labelsAndPreds_lr = scaledTestData.map { point =>
val pred = model_lr.predict(point.features)
(pred, point.label)
}
val m_lr = eval_metrics(labelsAndPreds_lr)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_lr(0), m_lr(1), m_lr(2), m_lr(3)))
_______________________________________________________________________________________________________________
import org.apache.spark.mllib.classification.SVMWithSGD
// Build the SVM model
val svmAlg = new SVMWithSGD()
svmAlg.optimizer.setNumIterations(100)
.setRegParam(1.0)
.setStepSize(1.0)
val model_svm = svmAlg.run(scaledTrainData)
// Predict
val labelsAndPreds_svm = scaledTestData.map { point =>
val pred = model_svm.predict(point.features)
(pred, point.label)
}
val m_svm = eval_metrics(labelsAndPreds_svm)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_svm(0), m_svm(1), m_svm(2), m_svm(3)))
__________________________________________________________________________________________________________________________
import org.apache.spark.mllib.tree.DecisionTree
// Build the Decision Tree model
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 10
val maxBins = 100
val model_dt = DecisionTree.trainClassifier(parsedTrainData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins)
// Predict
val labelsAndPreds_dt = parsedTestData.map { point =>
val pred = model_dt.predict(point.features)
(pred, point.label)
}
val m_dt = eval_metrics(labelsAndPreds_dt)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_dt(0), m_dt(1), m_dt(2), m_dt(3)))
________________________________________________________________________________________________________________________
//WIth weather data
import org.apache.spark.SparkContext._
import scala.collection.JavaConverters._
import au.com.bytecode.opencsv.CSVReader
import java.io._
// function to do a preprocessing step for a given file
def preprocess_spark(delay_file: String, weather_file: String): RDD[Array[Double]] = {
// Read wether data
val delayRecs = prepFlightDelays(delay_file).map{ rec =>
val features = rec.gen_features
(features._1, features._2)
}
// Read weather data into RDDs
val station_inx = 0
val date_inx = 1
val metric_inx = 2
val value_inx = 3
def filterMap(wdata:RDD[Array[String]], metric:String):RDD[(String,Double)] = {
wdata.filter(vals => vals(metric_inx) == metric).map(vals => (vals(date_inx), vals(value_inx).toDouble))
}
val wdata = sc.textFile(weather_file).map(line => line.split(","))
.filter(vals => vals(station_inx) == "USW00094846")
val w_tmin = filterMap(wdata,"TMIN")
val w_tmax = filterMap(wdata,"TMAX")
val w_prcp = filterMap(wdata,"PRCP")
val w_snow = filterMap(wdata,"SNOW")
val w_awnd = filterMap(wdata,"AWND")
delayRecs.join(w_tmin).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2)))
.join(w_tmax).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2)))
.join(w_prcp).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2)))
.join(w_snow).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2)))
.join(w_awnd).map(vals => vals._2._1 ++ Array(vals._2._2))
}
val data_2015 = preprocess_spark("alpha.csv", "2015.csv")
val data_2016 = preprocess_spark("beta.csv", "2016.csv")
data_2015.take(5).map(x => x mkString ",").foreach(println)
________________________________________________________________________________________________
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.feature.StandardScaler
def parseData(vals: Array[Double]): LabeledPoint = {
LabeledPoint(if (vals(0)>=15) 1.0 else 0.0, Vectors.dense(vals.drop(1)))
}
// Prepare training set
val parsedTrainData = data_2015.map(parseData)
val scaler = new StandardScaler(withMean = true, withStd = true).fit(parsedTrainData.map(x => x.features))
val scaledTrainData = parsedTrainData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))
parsedTrainData.cache
scaledTrainData.cache
// Prepare test/validation set
val parsedTestData = data_2016.map(parseData)
val scaledTestData = parsedTestData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))
parsedTestData.cache
scaledTestData.cache
scaledTrainData.take(3).map(x => (x.label, x.features)).foreach(println)
_________________________________________________________________________________________________________________________
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.optimization.L1Updater
// Build the SVM model
val svmAlg = new SVMWithSGD()
svmAlg.optimizer.setNumIterations(100)
.setRegParam(1.0)
.setStepSize(1.0)
val model_svm = svmAlg.run(scaledTrainData)
// Predict
val labelsAndPreds_svm = scaledTestData.map { point =>
val pred = model_svm.predict(point.features)
(pred, point.label)
}
val m_svm = eval_metrics(labelsAndPreds_svm)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_svm(0), m_svm(1), m_svm(2), m_svm(3)))
___________________________________________________________________________________________________________________________
import org.apache.spark.mllib.tree.DecisionTree
// Build the Decision Tree model
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 10
val maxBins = 100
val model_dt = DecisionTree.trainClassifier(scaledTrainData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins)
// Predict
val labelsAndPreds_dt = scaledTestData.map { point =>
val pred = model_dt.predict(point.features)
(pred, point.label)
}
val m_dt = eval_metrics(labelsAndPreds_dt)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_dt(0), m_dt(1), m_dt(2), m_dt(3)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment