Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dgadiraju/0c81b0cdae330274fe260bf446108b38 to your computer and use it in GitHub Desktop.
Save dgadiraju/0c81b0cdae330274fe260bf446108b38 to your computer and use it in GitHub Desktop.
/*
spark-shell --master yarn \
--conf spark.ui.port=12345 \
--num-executors 6 \
--executor-cores 2 \
--executor-memory 2G
*/
// Solution using Core API
val crimeData = sc.textFile("/public/crime/csv")
val header = crimeData.first
val crimeDataWithoutHeader = crimeData.filter(criminalRecord => criminalRecord != header)
/*
// Logic to convert a record into tuple
val rec = crimeDataWithoutHeader.first
// Extract date eg: 12/31/2007
// We need only year and month in YYYYMM format, 12/31/2007 -> 200712
// Finally create tuple ((crime_month, crime_type), 1)
val t = {
val r = rec.split(",")
val d = r(2).split(" ")(0) // 12/31/2007
val m = d.split("/")(2) + d.split("/")(0) //200712
((m.toInt, r(5)), 1) //tuple
}
*/
val criminalRecordsWithMonthAndType = crimeDataWithoutHeader.
map(rec => {
val r = rec.split(",")
val d = r(2).split(" ")(0) // 12/31/2007
val m = d.split("/")(2) + d.split("/")(0) //200712
((m.toInt, r(5)), 1)
})
val crimeCountPerMonthPerType = criminalRecordsWithMonthAndType.
reduceByKey((total, value) => total + value)
//((200707,WEAPONS VIOLATION),count) -> ((200707, count), "200707,count,WEAPONS VIOLATION")
// 200707,count,WEAPONS VIOLATION
val crimeCountPerMonthPerTypeSorted = crimeCountPerMonthPerType.
map(rec => ((rec._1._1, -rec._2), rec._1._1 + "\t" + rec._2 + "\t" + rec._1._2)).
sortByKey().
map(rec => rec._2)
crimeCountPerMonthPerTypeSorted.
coalesce(1).
saveAsTextFile("/user/dgadiraju/solutions/solution01/crimes_by_type_by_month",
classOf[org.apache.hadoop.io.compress.GzipCodec])
@ngvinay
Copy link

ngvinay commented May 28, 2018

coalesce brings data from different partitions to a single partition. Doesn't that change the order of rows we sorted earlier?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment