Skip to content

Instantly share code, notes, and snippets.

@vikas-gonti
Created June 30, 2018 19:16
Show Gist options
  • Save vikas-gonti/276eb7fe48740cf3819ac2eeef8c5a7e to your computer and use it in GitHub Desktop.
Save vikas-gonti/276eb7fe48740cf3819ac2eeef8c5a7e to your computer and use it in GitHub Desktop.
Get monthly crime count by type
/* Data is available in HDFS file system under /public/crime/csv
You can check properties of files using hadoop fs -ls -h /public/crime/csv
Structure of data (ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location)
File format - text file
Delimiter - “,”
Get monthly count of primary crime type, sorted by month in ascending and number of crimes per type in descending order
Store the result in HDFS path /user/<YOUR_USER_ID>/solutions/solution01/crimes_by_type_by_month
Output File Format: TEXT
Output Columns: Month in YYYYMM format, crime count, crime type
Output Delimiter: \t (tab delimited)
Output Compression: gzip */
/*
spark-shell --master yarn \
--conf spark.ui.port=12345 \
--num-executors 6 \
--executor-cores 2 \
--executor-memory 2G
*/
val crimeData = sc.textFile("/public/crime/csv")
val crimeDataWithHeader = crimeData.first
val crimeDataWithoutHeader = crimeData.filter(rec => rec!=crimeDataWithHeader)
val crimeDataRec = crimeDataWithoutHeader.map(r => (r.split(",")(2).split(" ")(0),r.split(",")(5)))
val crimeDataDF = crimeDataRec.toDF("Month","Crime_Type")
crimeDataDF.show
//sqlContext
crimeDataDF.registerTempTable("Crime")
val sqlResult = sqlContext.sql("select concat(substring(month,7,10),substring(month,1,2)) Month, count(1) "+
"crime_count,crime_type from crime "+
"group by concat(substring(month,7,10),substring(month,1,2)), Crime_Type "+
"order by month, crime_count desc")
val out = sqlResult.rdd.
map(rec => rec.mkString("\t")).
coalesce(1).
saveAsTextFile("user/gontiv/solutions/solution01/crimes_by_type_by_month",
classOf[org.apache.hadoop.io.compress.GzipCodec])
// Core API
val crimeDataRec = crimeDataWithoutHeader.map(rec => {
val r = rec.split(",")
val d = r(2).split(" ")(0)
val m = d.split("/")(2)+d.split("/")(0)
((m.toInt,r(5)),1)
})
val crimeDataReduced = crimeDataRec.reduceByKey((t,v) => t+v)
val crimeCountPerMonthPerTypeSorted = crimeDataReduced.
map(rec => ((rec._1._1, -rec._2), rec._1._1 + "\t" + rec._2 + "\t" + rec._1._2)).
sortByKey().
map(rec => rec._2)
crimeCountPerMonthPerTypeSorted.coalesce(1).
saveAsTextFile("user/gontiv/solutions/solution01_core/crimes_by_type_by_month",
classOf[org.apache.hadoop.io.compress.GzipCodec])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment