Skip to content

Instantly share code, notes, and snippets.

@lieuzhenghong
Created September 9, 2019 16:12
Show Gist options
  • Save lieuzhenghong/461ec3980dbb989d82707ffff239b825 to your computer and use it in GitHub Desktop.
Save lieuzhenghong/461ec3980dbb989d82707ffff239b825 to your computer and use it in GitHub Desktop.
object SimpleApp {
def main(args: Array[String]) {
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.explode
val spark = SparkSession.builder.appName("TripAnalysis").getOrCreate()
import spark.implicits._
val results_path = "s3a://results/"
val paths = "s3a://trips/*"
val tripDF = spark.read.option("multiline", "true").json(paths)
val linksDF = tripDF.select(explode($"data").as("data"))
val linksDF2 = linksDF.select("data.dbResponse.linkID", "data.absVelocity")
// create a temporary view using the DataFrame
linksDF2.createOrReplaceTempView("times")
val tDF = spark.sql("SELECT * from times WHERE linkID IS NOT NULL AND absVelocity IS NOT NULL")
val groupedDS = tDF.groupBy("linkID")
val avgsDS = groupedDS.agg(
"linkID" -> "count",
"absVelocity" -> "avg"
)
avgsDS.write.
option("header", "true").
csv(results_path + "results")
spark.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment