Skip to content

Instantly share code, notes, and snippets.

@vikas-gonti
Created June 30, 2018 20:32
Show Gist options
  • Save vikas-gonti/0bc8608f1a4ef2f470407af2042be24c to your computer and use it in GitHub Desktop.
Save vikas-gonti/0bc8608f1a4ef2f470407af2042be24c to your computer and use it in GitHub Desktop.
Get top 3 crime types based on number of incidents in RESIDENCE area
/*
Structure of data (ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location)
File format - text file
Delimiter - “,” (use regex while splitting split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1), as there are some fields with comma and enclosed using double quotes.
Get top 3 crime types based on number of incidents in RESIDENCE area using “Location Description”
Store the result in HDFS path /user/<YOUR_USER_ID>/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA
Output Fields: Crime Type, Number of Incidents
Output File Format: JSON
Output Delimiter: N/A
Output Compression: No
spark-shell --master yarn \
--conf spark.ui.port=12345 \
--num-executors 6 \
--executor-cores 2 \
--executor-memory 2G
*/
val crimeData = sc.textFile("/public/crime/csv")
val crimeDataHeader = crimeData.first
val crimeDataWithoutHeader = crimeData.filter(rec => rec!=crimeDataHeader)
crimeDataWithoutHeader.take(10).foreach(println)
val crimeDataRec = crimeDataWithoutHeader.map(rec => {
val r = rec.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1)
(r(5),r(7))
})
//CORE API
val crimeDataRes = crimeDataRec.filter(rec => rec._2 == "RESIDENCE").map(rec => {
(rec._1,1)
}).reduceByKey((t,v)=>t+v)
val crimeCountForResidence = sc.parallelize(crimeDataRes.map(rec => {
(rec._2,rec._1)
}).sortByKey(false).map(rec => (rec._2,rec._1)).take(3))
crimeCountForResidence.toDF("crime_type", "crime_count").
write.json("user/gontiv/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA")
//SqlContext
val crimeDataDF = crimeDataRec.toDF("crime_type", "location")
crimeDataDF.registerTempTable("CrimeData")
val sqlResult = sqlContext.sql("select * from (select crime_type, count(1) crime_count from "+
"CrimeData where location = 'RESIDENCE' group by "+
"crime_type order by crime_count desc) a limit 3")
sqlResult.coalesce(1).
save("/user/gontiv/solutions/solution03/RESIDENCE_AREA_CRIMINAL_TYPE_DATA", "json")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment