CSE 511 (Data Processing at Scale) course project phase 2 task (Fall 2019 Arizona State University)
* CSE 511 Project Phase 2 (Fall 2019 ASU)
* This is completion of the code template mentioned below:
* []
* Full dataset is available on Google Drive:
* []
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Calendar
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.UserDefinedFunction
object HotcellAnalysis {
private val log = Logger.getLogger("CSE511-Hotcell-Analysis")
def runHotcellAnalysis(spark: SparkSession, pointPath: String): DataFrame = {
// Load the original data from a data source
var pickupInfo ="com.databricks.spark.csv")
.option("delimiter", ";")
.option("header", "false")
// Assign cell coordinates based on pickup points
spark.udf.register("CalculateX", (pickupPoint: String) => ((
HotcellUtils.CalculateCoordinate(pickupPoint, 0)
spark.udf.register("CalculateY", (pickupPoint: String) => ((
HotcellUtils.CalculateCoordinate(pickupPoint, 1)
spark.udf.register("CalculateZ", (pickupTime: String) => ((
HotcellUtils.CalculateCoordinate(pickupTime, 2)
pickupInfo = spark.sql(
"select CalculateX(nyctaxitrips._c5)," +
"CalculateY(nyctaxitrips._c5), CalculateZ(nyctaxitrips._c1) from nyctaxitrips"
val newCoordinateName = Seq("x", "y", "z")
pickupInfo = pickupInfo.toDF(newCoordinateName: _*)
// Define the min and max of x, y, z
val minX = -74.50 / HotcellUtils.coordinateStep
val maxX = -73.70 / HotcellUtils.coordinateStep
val minY = 40.50 / HotcellUtils.coordinateStep
val maxY = 40.90 / HotcellUtils.coordinateStep
val minZ = 1
val maxZ = 31
val numCells = (maxX - minX + 1) * (maxY - minY + 1) * (maxZ - minZ + 1)
val COL_X = "x"
val COL_Y = "y"
val COL_Z = "z"
val COL_COUNT = "count"
val COL_SUM = "sum"
val COL_NOAC = "num_of_adj_cells"
val COL_GSCORE = "g_score"
val FIRST_DF = "first_df"
val SECOND_DF = "second_df"
* Filter the coordinates to the area
* under observation and count the number
* of pickups for the given cell.
val selectedCellsCountDf = pickupInfo
col(COL_X) >= minX and col(COL_X) <= maxX
and col(COL_Y) >= minY and col(COL_Y) <= maxY
and col(COL_Z) >= minZ and col(COL_Z) <= maxZ
.groupBy(COL_X, COL_Y, COL_Z)
.count() // This is the aggregation step for the groupBy and not the "count" action on the [[DataFrame]].
val (total, sqrTotal) =
.aggregate((0.0, 0.0))(
(acc: (Double, Double), r) => {
val curr = r.getLong(0).toDouble
acc._1 + curr,
acc._2 + (curr * curr)
(x: (Double, Double), y: (Double, Double)) => (
x._1 + y._1,
x._2 + y._2
val mean = total / numCells
val std = math.sqrt((sqrTotal / numCells) - (mean * mean))"Mean: $mean")"Standard Deviation: $std")
* Cross join to get all the cells and the filter
* the cells which are adjacent to the each other.
* NOTE: This cross join operation is one of the expensive operation
* which joins millions of cells together just to filter upto 26 surronding
* cells. Future work can be done to index these cells and find out a way to
* join only the necessary cells.
val adjacentDf =
col(s"$FIRST_DF.$COL_X"), col(s"$FIRST_DF.$COL_Y"),
col(s"$FIRST_DF.$COL_Z"), col(s"$SECOND_DF.$COL_X"),
col(s"$SECOND_DF.$COL_Y"), col(s"$SECOND_DF.$COL_Z")
col(s"$FIRST_DF.$COL_X"), col(s"$FIRST_DF.$COL_Y"),
col(s"$FIRST_DF.$COL_Z"), col(s"$SECOND_DF.$COL_COUNT")
.groupBy(COL_X, COL_Y, COL_Z)
.agg(sum(COL_COUNT) as COL_SUM, count(COL_COUNT) as COL_NOAC)
* Sorting after repartition may not be required.
* But we have added the line as we the order is maintained
* when there is only one part file to write, and sorting
* 50 elements is not computationally heavy in the given case.
val scoredDf = adjacentDf.withColumn(COL_GSCORE,
HotcellUtils.g_score(mean, std, numCells)(col(COL_SUM), col(COL_NOAC)))
object HotcellUtils {
val coordinateStep = 0.01
def CalculateCoordinate(inputString: String, coordinateOffset: Int): Int = {
// Configuration variable:
// Coordinate step is the size of each cell on x and y
var result = 0
coordinateOffset match {
case 0 => result = Math.floor((inputString.split(",")(0).replace("(", "").toDouble / coordinateStep)).toInt
case 1 => result = Math.floor(inputString.split(",")(1).replace(")", "").toDouble / coordinateStep).toInt
// We only consider the data from 2009 to 2012 inclusively, 4 years in total. Week 0 Day 0 is 2009-01-01
case 2 =>
val timestamp = HotcellUtils.timestampParser(inputString)
result = HotcellUtils.dayOfMonth(timestamp) // Assume every month has 31 days
def timestampParser(timestampString: String): Timestamp = {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
val parsedDate = dateFormat.parse(timestampString)
val timeStamp = new Timestamp(parsedDate.getTime)
def dayOfYear(timestamp: Timestamp): Int = {
val calendar = Calendar.getInstance
def dayOfMonth(timestamp: Timestamp): Int = {
val calendar = Calendar.getInstance
def isAdjacentCubeFn(x1: Int, y1: Int, z1: Int,
x2: Int, y2: Int, z2: Int): Boolean = {
if (math.abs(x1 - x2) > 1
|| math.abs(y1 - y2) > 1
|| math.abs(z1 - z2) > 1) false else true
def gScoreFn(mean: Double, std: Double, numCells: Double)
(sum: Long, adjacentCount: Long): Double = {
val ad = adjacentCount.toDouble
(sum - mean * ad) / (
std * math.sqrt(
(numCells * ad - (ad * ad)) / (numCells - 1.0))
val is_adjacent_cube: UserDefinedFunction =
udf[Boolean, Int, Int, Int, Int, Int, Int](isAdjacentCubeFn)
def g_score(mean: Double, std: Double, numCells: Double): UserDefinedFunction =
udf[Double, Long, Long](gScoreFn(mean, std, numCells))
