Skip to content

Instantly share code, notes, and snippets.

@supremum76
Created April 7, 2021 19:41
Show Gist options
  • Save supremum76/601ca1676b799309f2f2ccdf9cc0e85c to your computer and use it in GitHub Desktop.
Save supremum76/601ca1676b799309f2f2ccdf9cc0e85c to your computer and use it in GitHub Desktop.
import org.apache.spark
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{FloatType, StringType, StructField, StructType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{lit, percentile_approx, collect_list, desc, concat_ws, row_number}
import org.apache.spark.sql.functions.{broadcast}
object BostonCrimesMapSQL extends App{
//--------------- FILES PATH ----------------------------------------------------------------------------------------
val crimesFilePath = args(0)
val offenceCodesFilePath = args(1)
val resultFilePath = args(2)
//--------------- SPARK INIT ----------------------------------------------------------------------------------------
val conf: spark.SparkConf = new spark.SparkConf().setAppName("BostonCrimesMap")
val sc = spark.SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
val sparkSession = spark.sql.SparkSession.builder
.config(conf = conf)
.appName("spark session example")
.getOrCreate()
//-------------------------------------------------------------------------------------------------------------------
val ds = sparkSession.read.option("header", "true").csv(crimesFilePath)
val offenseCodes = sparkSession.createDataFrame(sparkSession.read.option("header", "true").csv(offenceCodesFilePath)
.withColumnRenamed("CODE","OFFENSE_CODE")
.withColumnRenamed("NAME","OFFENSE_NAME")
/*В справочнике кодов есть дубликаты. Нужно выбрать уникальные коды, взяв любое из названий.*/
.dropDuplicates("OFFENSE_CODE")
.rdd
.map(row => Row(
row.getAs[String]("OFFENSE_CODE"),
/*Значения OFFENSE_NAME иногда взяты в двойные кавычки. Удаляем двойные кавычки
и оставляем только часть строки значения, взятую с начала строки до первого разделителя ' -' */
row.getAs[String]("OFFENSE_NAME")
.split("\\s[-]", 2)
.head
.replaceAll("\"","")
)
),
StructType(Seq(StructField("OFFENSE_CODE", StringType), StructField("OFFENSE_NAME", StringType)))
)
//-------------------------------------------------------------------------------------------------------------------
val districtCrimesTotal = ds.groupBy("DISTRICT").count()
.withColumnRenamed("count", "crimes_total")
val districtMonthCrimesTotal = ds.groupBy("DISTRICT", "YEAR", "MONTH").count()
.withColumnRenamed("count", "crimes_total")
//-------------------------------------------------------------------------------------------------------------------
val districtCrimesMonthly = districtMonthCrimesTotal
.groupBy("DISTRICT")
.agg(
percentile_approx(districtMonthCrimesTotal("crimes_total"), lit(0.5), lit(10000))
.as("crimes_monthly"))
//-------------------------------------------------------------------------------------------------------------------
val frequentByCrimeTypes = ds
.join(broadcast(offenseCodes), "OFFENSE_CODE")
.groupBy("DISTRICT", "OFFENSE_NAME").count()
.withColumn("row", row_number()
.over(
Window.partitionBy("DISTRICT").orderBy(desc("count"))))
.where("row <= 3") /*top 3*/
.drop("row")
.groupBy("DISTRICT")
.agg(collect_list("OFFENSE_NAME"))
val districtFrequentByCrimeTypes = frequentByCrimeTypes
.select(
frequentByCrimeTypes("DISTRICT"),
concat_ws(", ", frequentByCrimeTypes("collect_list(OFFENSE_NAME)"))
.as("frequent_crime_types")
)
//-------------------------------------------------------------------------------------------------------------------
val districtCrimesAvg = ds
.withColumn("Lat", ds("Lat").cast(FloatType))
.withColumn("Long", ds("Long").cast(FloatType))
.groupBy("DISTRICT")
.mean("Lat", "Long")
.withColumnRenamed("avg(Lat)","lat")
.withColumnRenamed("avg(Long)","lng")
//-------------------------------------------------------------------------------------------------------------------
val unitedBostonCrimesMap = districtCrimesTotal
.join(districtCrimesMonthly, "DISTRICT")
.join(districtFrequentByCrimeTypes, "DISTRICT")
.join(districtCrimesAvg, "DISTRICT")
//-------------------------------------------------------------------------------------------------------------------
//Save result to file of parquet format
unitedBostonCrimesMap.write.parquet(resultFilePath)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment