Skip to content

Instantly share code, notes, and snippets.

@lieuzhenghong
Created September 16, 2019 14:26
Show Gist options
  • Save lieuzhenghong/1a499077f00f4972eee59e2ee9564973 to your computer and use it in GitHub Desktop.
Save lieuzhenghong/1a499077f00f4972eee59e2ee9564973 to your computer and use it in GitHub Desktop.
Scala code for doing data analysis
object SimpleApp {
def main(args: Array[String]) {
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.explode
val spark = SparkSession.builder.appName("TripAnalysis").getOrCreate()
import spark.implicits._
val results_path = "s3a://results/"
val paths = "s3a://trips/*"
val tripDF = spark.read.option("multiline", "true").json(paths)
// "Explode" the data array into individual rows
val linksDF = tripDF.select(explode($"data").as("data"))
val linksDF2 = linksDF.select("data.dbResponse.linkID", "data.absVelocity")
// create a temporary view using the DataFrame
linksDF2.createOrReplaceTempView("times")
/*
root
|-- linkID: string (nullable = true)
|-- absVelocity: double (nullable = true)
*/
val tDF = spark.sql("SELECT CAST(linkID as LONG), absVelocity from times
WHERE linkID IS NOT NULL AND absVelocity IS NOT NULL")
val groupedDS = tDF.groupBy("linkID")
val avgsDS = groupedDS.agg(
"linkID" -> "count",
"absVelocity" -> "avg"
).sort($"linkID".asc)
avgsDS.coalesce(1).write.
option("header", "true").
csv(results_path + "results_49998")
spark.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment