Last active
November 25, 2021 20:31
-
-
Save PratikBarhate/ea518f6f5b87def1933f321c5eda9654 to your computer and use it in GitHub Desktop.
CSE 511 (Data Processing at Scale) course project phase 2 task (Fall 2019 Arizona State University)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* CSE 511 Project Phase 2 (Fall 2019 ASU) | |
* | |
* This is completion of the code template mentioned below: | |
* [https://github.com/jiayuasu/CSE512-Project-Hotspot-Analysis-Template] | |
* | |
* Full dataset is available on Google Drive: | |
* [https://drive.google.com/open?id=1bN-U4nknvN5p7jiVHO-wduM7oXR5CBji] | |
*/ | |
import java.sql.Timestamp | |
import java.text.SimpleDateFormat | |
import java.util.Calendar | |
import org.apache.log4j.{Level, Logger} | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.{DataFrame, SparkSession} | |
import org.apache.spark.sql.expressions.UserDefinedFunction | |
object HotcellAnalysis { | |
Logger.getLogger("org.spark_project").setLevel(Level.WARN) | |
Logger.getLogger("org.apache").setLevel(Level.WARN) | |
Logger.getLogger("akka").setLevel(Level.WARN) | |
Logger.getLogger("com").setLevel(Level.WARN) | |
private val log = Logger.getLogger("CSE511-Hotcell-Analysis") | |
def runHotcellAnalysis(spark: SparkSession, pointPath: String): DataFrame = { | |
// Load the original data from a data source | |
var pickupInfo = spark.read.format("com.databricks.spark.csv") | |
.option("delimiter", ";") | |
.option("header", "false") | |
.load(pointPath); | |
pickupInfo.createOrReplaceTempView("nyctaxitrips") | |
pickupInfo.show() | |
// Assign cell coordinates based on pickup points | |
spark.udf.register("CalculateX", (pickupPoint: String) => (( | |
HotcellUtils.CalculateCoordinate(pickupPoint, 0) | |
))) | |
spark.udf.register("CalculateY", (pickupPoint: String) => (( | |
HotcellUtils.CalculateCoordinate(pickupPoint, 1) | |
))) | |
spark.udf.register("CalculateZ", (pickupTime: String) => (( | |
HotcellUtils.CalculateCoordinate(pickupTime, 2) | |
))) | |
pickupInfo = spark.sql( | |
"select CalculateX(nyctaxitrips._c5)," + | |
"CalculateY(nyctaxitrips._c5), CalculateZ(nyctaxitrips._c1) from nyctaxitrips" | |
) | |
val newCoordinateName = Seq("x", "y", "z") | |
pickupInfo = pickupInfo.toDF(newCoordinateName: _*) | |
pickupInfo.show() | |
// Define the min and max of x, y, z | |
val minX = -74.50 / HotcellUtils.coordinateStep | |
val maxX = -73.70 / HotcellUtils.coordinateStep | |
val minY = 40.50 / HotcellUtils.coordinateStep | |
val maxY = 40.90 / HotcellUtils.coordinateStep | |
val minZ = 1 | |
val maxZ = 31 | |
val numCells = (maxX - minX + 1) * (maxY - minY + 1) * (maxZ - minZ + 1) | |
val COL_X = "x" | |
val COL_Y = "y" | |
val COL_Z = "z" | |
val COL_COUNT = "count" | |
val COL_SUM = "sum" | |
val COL_NOAC = "num_of_adj_cells" | |
val COL_GSCORE = "g_score" | |
val FIRST_DF = "first_df" | |
val SECOND_DF = "second_df" | |
/** | |
* Filter the coordinates to the area | |
* under observation and count the number | |
* of pickups for the given cell. | |
*/ | |
val selectedCellsCountDf = pickupInfo | |
.filter( | |
col(COL_X) >= minX and col(COL_X) <= maxX | |
and col(COL_Y) >= minY and col(COL_Y) <= maxY | |
and col(COL_Z) >= minZ and col(COL_Z) <= maxZ | |
) | |
.groupBy(COL_X, COL_Y, COL_Z) | |
.count() // This is the aggregation step for the groupBy and not the "count" action on the [[DataFrame]]. | |
.cache() | |
val (total, sqrTotal) = selectedCellsCountDf.select(COL_COUNT) | |
.rdd | |
.aggregate((0.0, 0.0))( | |
(acc: (Double, Double), r) => { | |
val curr = r.getLong(0).toDouble | |
( | |
acc._1 + curr, | |
acc._2 + (curr * curr) | |
) | |
}, | |
(x: (Double, Double), y: (Double, Double)) => ( | |
x._1 + y._1, | |
x._2 + y._2 | |
) | |
) | |
val mean = total / numCells | |
val std = math.sqrt((sqrTotal / numCells) - (mean * mean)) | |
log.info(s"Mean: $mean") | |
log.info(s"Standard Deviation: $std") | |
/** | |
* Cross join to get all the cells and the filter | |
* the cells which are adjacent to the each other. | |
* | |
* NOTE: This cross join operation is one of the expensive operation | |
* which joins millions of cells together just to filter upto 26 surronding | |
* cells. Future work can be done to index these cells and find out a way to | |
* join only the necessary cells. | |
*/ | |
val adjacentDf = selectedCellsCountDf.as(FIRST_DF) | |
.crossJoin(selectedCellsCountDf.as(SECOND_DF)) | |
.filter( | |
HotcellUtils.is_adjacent_cube( | |
col(s"$FIRST_DF.$COL_X"), col(s"$FIRST_DF.$COL_Y"), | |
col(s"$FIRST_DF.$COL_Z"), col(s"$SECOND_DF.$COL_X"), | |
col(s"$SECOND_DF.$COL_Y"), col(s"$SECOND_DF.$COL_Z") | |
) | |
) | |
.select( | |
col(s"$FIRST_DF.$COL_X"), col(s"$FIRST_DF.$COL_Y"), | |
col(s"$FIRST_DF.$COL_Z"), col(s"$SECOND_DF.$COL_COUNT") | |
) | |
.groupBy(COL_X, COL_Y, COL_Z) | |
.agg(sum(COL_COUNT) as COL_SUM, count(COL_COUNT) as COL_NOAC) | |
/** | |
* Sorting after repartition may not be required. | |
* But we have added the line as we the order is maintained | |
* when there is only one part file to write, and sorting | |
* 50 elements is not computationally heavy in the given case. | |
*/ | |
val scoredDf = adjacentDf.withColumn(COL_GSCORE, | |
HotcellUtils.g_score(mean, std, numCells)(col(COL_SUM), col(COL_NOAC))) | |
.sort(desc(COL_GSCORE)) | |
.limit(50) | |
.repartition(1) | |
.sort(desc(COL_GSCORE)) | |
.select(COL_X, COL_Y, COL_Z, COL_GSCORE) | |
scoredDf.select(COL_X, COL_Y, COL_Z) | |
} | |
} | |
object HotcellUtils { | |
val coordinateStep = 0.01 | |
def CalculateCoordinate(inputString: String, coordinateOffset: Int): Int = { | |
// Configuration variable: | |
// Coordinate step is the size of each cell on x and y | |
var result = 0 | |
coordinateOffset match { | |
case 0 => result = Math.floor((inputString.split(",")(0).replace("(", "").toDouble / coordinateStep)).toInt | |
case 1 => result = Math.floor(inputString.split(",")(1).replace(")", "").toDouble / coordinateStep).toInt | |
// We only consider the data from 2009 to 2012 inclusively, 4 years in total. Week 0 Day 0 is 2009-01-01 | |
case 2 => | |
val timestamp = HotcellUtils.timestampParser(inputString) | |
result = HotcellUtils.dayOfMonth(timestamp) // Assume every month has 31 days | |
} | |
result | |
} | |
def timestampParser(timestampString: String): Timestamp = { | |
val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss") | |
val parsedDate = dateFormat.parse(timestampString) | |
val timeStamp = new Timestamp(parsedDate.getTime) | |
timeStamp | |
} | |
def dayOfYear(timestamp: Timestamp): Int = { | |
val calendar = Calendar.getInstance | |
calendar.setTimeInMillis(timestamp.getTime) | |
calendar.get(Calendar.DAY_OF_YEAR) | |
} | |
def dayOfMonth(timestamp: Timestamp): Int = { | |
val calendar = Calendar.getInstance | |
calendar.setTimeInMillis(timestamp.getTime) | |
calendar.get(Calendar.DAY_OF_MONTH) | |
} | |
def isAdjacentCubeFn(x1: Int, y1: Int, z1: Int, | |
x2: Int, y2: Int, z2: Int): Boolean = { | |
if (math.abs(x1 - x2) > 1 | |
|| math.abs(y1 - y2) > 1 | |
|| math.abs(z1 - z2) > 1) false else true | |
} | |
def gScoreFn(mean: Double, std: Double, numCells: Double) | |
(sum: Long, adjacentCount: Long): Double = { | |
val ad = adjacentCount.toDouble | |
(sum - mean * ad) / ( | |
std * math.sqrt( | |
(numCells * ad - (ad * ad)) / (numCells - 1.0)) | |
) | |
} | |
val is_adjacent_cube: UserDefinedFunction = | |
udf[Boolean, Int, Int, Int, Int, Int, Int](isAdjacentCubeFn) | |
def g_score(mean: Double, std: Double, numCells: Double): UserDefinedFunction = | |
udf[Double, Long, Long](gScoreFn(mean, std, numCells)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment