Created
June 30, 2018 20:32
-
-
Save vikas-gonti/0bc8608f1a4ef2f470407af2042be24c to your computer and use it in GitHub Desktop.
Get top 3 crime types based on number of incidents in RESIDENCE area
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Structure of data (ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location) | |
File format - text file | |
Delimiter - “,” (use regex while splitting split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1), as there are some fields with comma and enclosed using double quotes. | |
Get top 3 crime types based on number of incidents in RESIDENCE area using “Location Description” | |
Store the result in HDFS path /user/<YOUR_USER_ID>/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA | |
Output Fields: Crime Type, Number of Incidents | |
Output File Format: JSON | |
Output Delimiter: N/A | |
Output Compression: No | |
spark-shell --master yarn \ | |
--conf spark.ui.port=12345 \ | |
--num-executors 6 \ | |
--executor-cores 2 \ | |
--executor-memory 2G | |
*/ | |
val crimeData = sc.textFile("/public/crime/csv") | |
val crimeDataHeader = crimeData.first | |
val crimeDataWithoutHeader = crimeData.filter(rec => rec!=crimeDataHeader) | |
crimeDataWithoutHeader.take(10).foreach(println) | |
val crimeDataRec = crimeDataWithoutHeader.map(rec => { | |
val r = rec.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1) | |
(r(5),r(7)) | |
}) | |
//CORE API | |
val crimeDataRes = crimeDataRec.filter(rec => rec._2 == "RESIDENCE").map(rec => { | |
(rec._1,1) | |
}).reduceByKey((t,v)=>t+v) | |
val crimeCountForResidence = sc.parallelize(crimeDataRes.map(rec => { | |
(rec._2,rec._1) | |
}).sortByKey(false).map(rec => (rec._2,rec._1)).take(3)) | |
crimeCountForResidence.toDF("crime_type", "crime_count"). | |
write.json("user/gontiv/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA") | |
//SqlContext | |
val crimeDataDF = crimeDataRec.toDF("crime_type", "location") | |
crimeDataDF.registerTempTable("CrimeData") | |
val sqlResult = sqlContext.sql("select * from (select crime_type, count(1) crime_count from "+ | |
"CrimeData where location = 'RESIDENCE' group by "+ | |
"crime_type order by crime_count desc) a limit 3") | |
sqlResult.coalesce(1). | |
save("/user/gontiv/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA", "json") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment