Created
April 7, 2021 19:41
-
-
Save supremum76/601ca1676b799309f2f2ccdf9cc0e85c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark | |
import org.apache.spark.sql.Row | |
import org.apache.spark.sql.types.{FloatType, StringType, StructField, StructType} | |
import org.apache.spark.sql.expressions.Window | |
import org.apache.spark.sql.functions.{lit, percentile_approx, collect_list, desc, concat_ws, row_number} | |
import org.apache.spark.sql.functions.{broadcast} | |
object BostonCrimesMapSQL extends App{ | |
//--------------- FILES PATH ---------------------------------------------------------------------------------------- | |
val crimesFilePath = args(0) | |
val offenceCodesFilePath = args(1) | |
val resultFilePath = args(2) | |
//--------------- SPARK INIT ---------------------------------------------------------------------------------------- | |
val conf: spark.SparkConf = new spark.SparkConf().setAppName("BostonCrimesMap") | |
val sc = spark.SparkContext.getOrCreate(conf) | |
sc.setLogLevel("ERROR") | |
val sparkSession = spark.sql.SparkSession.builder | |
.config(conf = conf) | |
.appName("spark session example") | |
.getOrCreate() | |
//------------------------------------------------------------------------------------------------------------------- | |
val ds = sparkSession.read.option("header", "true").csv(crimesFilePath) | |
val offenseCodes = sparkSession.createDataFrame(sparkSession.read.option("header", "true").csv(offenceCodesFilePath) | |
.withColumnRenamed("CODE","OFFENSE_CODE") | |
.withColumnRenamed("NAME","OFFENSE_NAME") | |
/*В справочнике кодов есть дубликаты. Нужно выбрать уникальные коды, взяв любое из названий.*/ | |
.dropDuplicates("OFFENSE_CODE") | |
.rdd | |
.map(row => Row( | |
row.getAs[String]("OFFENSE_CODE"), | |
/*Значения OFFENSE_NAME иногда взяты в двойные кавычки. Удаляем двойные кавычки | |
и оставляем только часть строки значения, взятую с начала строки до первого разделителя ' -' */ | |
row.getAs[String]("OFFENSE_NAME") | |
.split("\\s[-]", 2) | |
.head | |
.replaceAll("\"","") | |
) | |
), | |
StructType(Seq(StructField("OFFENSE_CODE", StringType), StructField("OFFENSE_NAME", StringType))) | |
) | |
//------------------------------------------------------------------------------------------------------------------- | |
val districtCrimesTotal = ds.groupBy("DISTRICT").count() | |
.withColumnRenamed("count", "crimes_total") | |
val districtMonthCrimesTotal = ds.groupBy("DISTRICT", "YEAR", "MONTH").count() | |
.withColumnRenamed("count", "crimes_total") | |
//------------------------------------------------------------------------------------------------------------------- | |
val districtCrimesMonthly = districtMonthCrimesTotal | |
.groupBy("DISTRICT") | |
.agg( | |
percentile_approx(districtMonthCrimesTotal("crimes_total"), lit(0.5), lit(10000)) | |
.as("crimes_monthly")) | |
//------------------------------------------------------------------------------------------------------------------- | |
val frequentByCrimeTypes = ds | |
.join(broadcast(offenseCodes), "OFFENSE_CODE") | |
.groupBy("DISTRICT", "OFFENSE_NAME").count() | |
.withColumn("row", row_number() | |
.over( | |
Window.partitionBy("DISTRICT").orderBy(desc("count")))) | |
.where("row <= 3") /*top 3*/ | |
.drop("row") | |
.groupBy("DISTRICT") | |
.agg(collect_list("OFFENSE_NAME")) | |
val districtFrequentByCrimeTypes = frequentByCrimeTypes | |
.select( | |
frequentByCrimeTypes("DISTRICT"), | |
concat_ws(", ", frequentByCrimeTypes("collect_list(OFFENSE_NAME)")) | |
.as("frequent_crime_types") | |
) | |
//------------------------------------------------------------------------------------------------------------------- | |
val districtCrimesAvg = ds | |
.withColumn("Lat", ds("Lat").cast(FloatType)) | |
.withColumn("Long", ds("Long").cast(FloatType)) | |
.groupBy("DISTRICT") | |
.mean("Lat", "Long") | |
.withColumnRenamed("avg(Lat)","lat") | |
.withColumnRenamed("avg(Long)","lng") | |
//------------------------------------------------------------------------------------------------------------------- | |
val unitedBostonCrimesMap = districtCrimesTotal | |
.join(districtCrimesMonthly, "DISTRICT") | |
.join(districtFrequentByCrimeTypes, "DISTRICT") | |
.join(districtCrimesAvg, "DISTRICT") | |
//------------------------------------------------------------------------------------------------------------------- | |
//Save result to file of parquet format | |
unitedBostonCrimesMap.write.parquet(resultFilePath) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment