Created
June 30, 2018 19:16
-
-
Save vikas-gonti/276eb7fe48740cf3819ac2eeef8c5a7e to your computer and use it in GitHub Desktop.
Get monthly crime count by type
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Data is available in HDFS file system under /public/crime/csv | |
You can check properties of files using hadoop fs -ls -h /public/crime/csv | |
Structure of data (ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location) | |
File format - text file | |
Delimiter - “,” | |
Get monthly count of primary crime type, sorted by month in ascending and number of crimes per type in descending order | |
Store the result in HDFS path /user/<YOUR_USER_ID>/solutions/solution01/crimes_by_type_by_month | |
Output File Format: TEXT | |
Output Columns: Month in YYYYMM format, crime count, crime type | |
Output Delimiter: \t (tab delimited) | |
Output Compression: gzip */ | |
/* | |
spark-shell --master yarn \ | |
--conf spark.ui.port=12345 \ | |
--num-executors 6 \ | |
--executor-cores 2 \ | |
--executor-memory 2G | |
*/ | |
val crimeData = sc.textFile("/public/crime/csv") | |
val crimeDataWithHeader = crimeData.first | |
val crimeDataWithoutHeader = crimeData.filter(rec => rec!=crimeDataWithHeader) | |
val crimeDataRec = crimeDataWithoutHeader.map(r => (r.split(",")(2).split(" ")(0),r.split(",")(5))) | |
val crimeDataDF = crimeDataRec.toDF("Month","Crime_Type") | |
crimeDataDF.show | |
//sqlContext | |
crimeDataDF.registerTempTable("Crime") | |
val sqlResult = sqlContext.sql("select concat(substring(month,7,10),substring(month,1,2)) Month, count(1) "+ | |
"crime_count,crime_type from crime "+ | |
"group by concat(substring(month,7,10),substring(month,1,2)), Crime_Type "+ | |
"order by month, crime_count desc") | |
val out = sqlResult.rdd. | |
map(rec => rec.mkString("\t")). | |
coalesce(1). | |
saveAsTextFile("user/gontiv/solutions/solution01/crimes_by_type_by_month", | |
classOf[org.apache.hadoop.io.compress.GzipCodec]) | |
// Core API | |
val crimeDataRec = crimeDataWithoutHeader.map(rec => { | |
val r = rec.split(",") | |
val d = r(2).split(" ")(0) | |
val m = d.split("/")(2)+d.split("/")(0) | |
((m.toInt,r(5)),1) | |
}) | |
val crimeDataReduced = crimeDataRec.reduceByKey((t,v) => t+v) | |
val crimeCountPerMonthPerTypeSorted = crimeDataReduced. | |
map(rec => ((rec._1._1, -rec._2), rec._1._1 + "\t" + rec._2 + "\t" + rec._1._2)). | |
sortByKey(). | |
map(rec => rec._2) | |
crimeCountPerMonthPerTypeSorted.coalesce(1). | |
saveAsTextFile("user/gontiv/solutions/solution01_core/crimes_by_type_by_month", | |
classOf[org.apache.hadoop.io.compress.GzipCodec]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment