Skip to content

Instantly share code, notes, and snippets.

@PratikBarhate
Last active November 25, 2021 20:31
Show Gist options
  • Save PratikBarhate/ea518f6f5b87def1933f321c5eda9654 to your computer and use it in GitHub Desktop.
Save PratikBarhate/ea518f6f5b87def1933f321c5eda9654 to your computer and use it in GitHub Desktop.
CSE 511 (Data Processing at Scale) course project phase 2 task (Fall 2019 Arizona State University)
/**
* CSE 511 Project Phase 2 (Fall 2019 ASU)
*
* This is completion of the code template mentioned below:
* [https://github.com/jiayuasu/CSE512-Project-Hotspot-Analysis-Template]
*
* Full dataset is available on Google Drive:
* [https://drive.google.com/open?id=1bN-U4nknvN5p7jiVHO-wduM7oXR5CBji]
*/
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Calendar
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.UserDefinedFunction
object HotcellAnalysis {
Logger.getLogger("org.spark_project").setLevel(Level.WARN)
Logger.getLogger("org.apache").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("com").setLevel(Level.WARN)
private val log = Logger.getLogger("CSE511-Hotcell-Analysis")
def runHotcellAnalysis(spark: SparkSession, pointPath: String): DataFrame = {
// Load the original data from a data source
var pickupInfo = spark.read.format("com.databricks.spark.csv")
.option("delimiter", ";")
.option("header", "false")
.load(pointPath);
pickupInfo.createOrReplaceTempView("nyctaxitrips")
pickupInfo.show()
// Assign cell coordinates based on pickup points
spark.udf.register("CalculateX", (pickupPoint: String) => ((
HotcellUtils.CalculateCoordinate(pickupPoint, 0)
)))
spark.udf.register("CalculateY", (pickupPoint: String) => ((
HotcellUtils.CalculateCoordinate(pickupPoint, 1)
)))
spark.udf.register("CalculateZ", (pickupTime: String) => ((
HotcellUtils.CalculateCoordinate(pickupTime, 2)
)))
pickupInfo = spark.sql(
"select CalculateX(nyctaxitrips._c5)," +
"CalculateY(nyctaxitrips._c5), CalculateZ(nyctaxitrips._c1) from nyctaxitrips"
)
val newCoordinateName = Seq("x", "y", "z")
pickupInfo = pickupInfo.toDF(newCoordinateName: _*)
pickupInfo.show()
// Define the min and max of x, y, z
val minX = -74.50 / HotcellUtils.coordinateStep
val maxX = -73.70 / HotcellUtils.coordinateStep
val minY = 40.50 / HotcellUtils.coordinateStep
val maxY = 40.90 / HotcellUtils.coordinateStep
val minZ = 1
val maxZ = 31
val numCells = (maxX - minX + 1) * (maxY - minY + 1) * (maxZ - minZ + 1)
val COL_X = "x"
val COL_Y = "y"
val COL_Z = "z"
val COL_COUNT = "count"
val COL_SUM = "sum"
val COL_NOAC = "num_of_adj_cells"
val COL_GSCORE = "g_score"
val FIRST_DF = "first_df"
val SECOND_DF = "second_df"
/**
* Filter the coordinates to the area
* under observation and count the number
* of pickups for the given cell.
*/
val selectedCellsCountDf = pickupInfo
.filter(
col(COL_X) >= minX and col(COL_X) <= maxX
and col(COL_Y) >= minY and col(COL_Y) <= maxY
and col(COL_Z) >= minZ and col(COL_Z) <= maxZ
)
.groupBy(COL_X, COL_Y, COL_Z)
.count() // This is the aggregation step for the groupBy and not the "count" action on the [[DataFrame]].
.cache()
val (total, sqrTotal) = selectedCellsCountDf.select(COL_COUNT)
.rdd
.aggregate((0.0, 0.0))(
(acc: (Double, Double), r) => {
val curr = r.getLong(0).toDouble
(
acc._1 + curr,
acc._2 + (curr * curr)
)
},
(x: (Double, Double), y: (Double, Double)) => (
x._1 + y._1,
x._2 + y._2
)
)
val mean = total / numCells
val std = math.sqrt((sqrTotal / numCells) - (mean * mean))
log.info(s"Mean: $mean")
log.info(s"Standard Deviation: $std")
/**
* Cross join to get all the cells and the filter
* the cells which are adjacent to the each other.
*
* NOTE: This cross join operation is one of the expensive operation
* which joins millions of cells together just to filter upto 26 surronding
* cells. Future work can be done to index these cells and find out a way to
* join only the necessary cells.
*/
val adjacentDf = selectedCellsCountDf.as(FIRST_DF)
.crossJoin(selectedCellsCountDf.as(SECOND_DF))
.filter(
HotcellUtils.is_adjacent_cube(
col(s"$FIRST_DF.$COL_X"), col(s"$FIRST_DF.$COL_Y"),
col(s"$FIRST_DF.$COL_Z"), col(s"$SECOND_DF.$COL_X"),
col(s"$SECOND_DF.$COL_Y"), col(s"$SECOND_DF.$COL_Z")
)
)
.select(
col(s"$FIRST_DF.$COL_X"), col(s"$FIRST_DF.$COL_Y"),
col(s"$FIRST_DF.$COL_Z"), col(s"$SECOND_DF.$COL_COUNT")
)
.groupBy(COL_X, COL_Y, COL_Z)
.agg(sum(COL_COUNT) as COL_SUM, count(COL_COUNT) as COL_NOAC)
/**
* Sorting after repartition may not be required.
* But we have added the line as we the order is maintained
* when there is only one part file to write, and sorting
* 50 elements is not computationally heavy in the given case.
*/
val scoredDf = adjacentDf.withColumn(COL_GSCORE,
HotcellUtils.g_score(mean, std, numCells)(col(COL_SUM), col(COL_NOAC)))
.sort(desc(COL_GSCORE))
.limit(50)
.repartition(1)
.sort(desc(COL_GSCORE))
.select(COL_X, COL_Y, COL_Z, COL_GSCORE)
scoredDf.select(COL_X, COL_Y, COL_Z)
}
}
object HotcellUtils {
val coordinateStep = 0.01
def CalculateCoordinate(inputString: String, coordinateOffset: Int): Int = {
// Configuration variable:
// Coordinate step is the size of each cell on x and y
var result = 0
coordinateOffset match {
case 0 => result = Math.floor((inputString.split(",")(0).replace("(", "").toDouble / coordinateStep)).toInt
case 1 => result = Math.floor(inputString.split(",")(1).replace(")", "").toDouble / coordinateStep).toInt
// We only consider the data from 2009 to 2012 inclusively, 4 years in total. Week 0 Day 0 is 2009-01-01
case 2 =>
val timestamp = HotcellUtils.timestampParser(inputString)
result = HotcellUtils.dayOfMonth(timestamp) // Assume every month has 31 days
}
result
}
def timestampParser(timestampString: String): Timestamp = {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
val parsedDate = dateFormat.parse(timestampString)
val timeStamp = new Timestamp(parsedDate.getTime)
timeStamp
}
def dayOfYear(timestamp: Timestamp): Int = {
val calendar = Calendar.getInstance
calendar.setTimeInMillis(timestamp.getTime)
calendar.get(Calendar.DAY_OF_YEAR)
}
def dayOfMonth(timestamp: Timestamp): Int = {
val calendar = Calendar.getInstance
calendar.setTimeInMillis(timestamp.getTime)
calendar.get(Calendar.DAY_OF_MONTH)
}
def isAdjacentCubeFn(x1: Int, y1: Int, z1: Int,
x2: Int, y2: Int, z2: Int): Boolean = {
if (math.abs(x1 - x2) > 1
|| math.abs(y1 - y2) > 1
|| math.abs(z1 - z2) > 1) false else true
}
def gScoreFn(mean: Double, std: Double, numCells: Double)
(sum: Long, adjacentCount: Long): Double = {
val ad = adjacentCount.toDouble
(sum - mean * ad) / (
std * math.sqrt(
(numCells * ad - (ad * ad)) / (numCells - 1.0))
)
}
val is_adjacent_cube: UserDefinedFunction =
udf[Boolean, Int, Int, Int, Int, Int, Int](isAdjacentCubeFn)
def g_score(mean: Double, std: Double, numCells: Double): UserDefinedFunction =
udf[Double, Long, Long](gScoreFn(mean, std, numCells))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment