Created
August 27, 2020 08:36
-
-
Save jhnwllr/9abb7225b88132e70286b7beea7dd44a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.Column | |
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile | |
import org.apache.spark.sql.functions.lit | |
import org.apache.spark.SparkContext | |
import org.apache.spark.mllib.random.RandomRDDs._ | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.functions.sqrt | |
import org.apache.spark.sql.expressions.Window | |
val sqlContext = new org.apache.spark.sql.SQLContext(sc) | |
import sqlContext.implicits._ | |
object PercentileApprox { | |
def percentile_approx(col: Column, percentage: Column, accuracy: Column): Column = { | |
val expr = new ApproximatePercentile( | |
col.expr, percentage.expr, accuracy.expr | |
).toAggregateExpression | |
new Column(expr) | |
} | |
def percentile_approx(col: Column, percentage: Column): Column = percentile_approx( | |
col, percentage, lit(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY) | |
) | |
} | |
import PercentileApprox._ | |
// import bioclim data | |
val table_name = "bioclim_0.1_extract.tsv" | |
// val database = "prod_h" | |
val database = "uat" | |
// read in bioclim table | |
val df_bioclim = spark.read. | |
option("sep", "\t"). | |
option("header", "true"). | |
option("inferSchema", "true"). | |
csv(table_name). | |
withColumn("decimallatitude_bioclim",$"decimallatitude"). | |
withColumn("decimallongitude_bioclim",$"decimallongitude"). | |
drop("decimallatitude"). | |
drop("decimallongitude") | |
val df_original = sqlContext.sql("SELECT * FROM " + database + ".occurrence"). | |
filter($"hasgeospatialissues" === false). | |
filter($"basisofrecord" =!= "FOSSIL_SPECIMEN"). | |
filter($"basisofrecord" =!= "UNKNOWN"). | |
filter($"basisofrecord" =!= "LIVING_SPECIMEN"). | |
filter($"kingdomkey" === 1 || $"kingdomkey" === 6 || $"kingdomkey" === 5). | |
filter($"decimallatitude".isNotNull). | |
filter($"decimallongitude".isNotNull). | |
withColumn("rounded_decimallatitude", round(col("decimallatitude") * 100 / 10) * 10 / 100). | |
withColumn("rounded_decimallongitude", round(col("decimallongitude") * 100 / 10) * 10 / 100). | |
select( | |
"gbifid", | |
"datasetkey", | |
"basisofrecord", | |
"specieskey", | |
"genuskey", | |
"familykey", | |
"phylumkey", | |
"classkey", | |
"kingdomkey", | |
"species", | |
"genus", | |
"family", | |
"phylum", | |
"class", | |
"kingdom", | |
"decimallatitude", | |
"decimallongitude", | |
"rounded_decimallatitude", | |
"rounded_decimallongitude", | |
"coordinateuncertaintyinmeters" | |
) | |
// get distinct coordinates | |
val df_occ_1 = df_original. | |
select("specieskey","rounded_decimallatitude","rounded_decimallongitude"). | |
distinct() | |
val bio_vars = df_bioclim. | |
drop("decimallatitude_bioclim"). | |
drop("decimallongitude_bioclim"). | |
columns.toSeq.toArray | |
// delete temp saving temp table at the end | |
import org.apache.spark.sql.SaveMode | |
import sys.process._ | |
(s"hdfs dfs -rm -r rjack_temp")! | |
// slice(0,3). | |
// run all bioclim variables and pca variables | |
val df_bioclim_join = df_occ_1.join(df_bioclim, | |
df_bioclim("decimallatitude_bioclim") === df_occ_1("rounded_decimallatitude") && | |
df_bioclim("decimallongitude_bioclim") === df_occ_1("rounded_decimallongitude")) | |
bio_vars. | |
map(xx => { | |
println(xx) | |
val df_occ_2 = df_bioclim_join. | |
withColumn("xx",col(xx)). | |
select("specieskey","xx"). | |
distinct() | |
val distinct_counts = df_occ_2. | |
groupBy("specieskey"). | |
count() | |
// start reverse jackknife part // | |
// create lagged data | |
val windowSpec = Window.partitionBy("specieskey").orderBy($"xx") | |
val df_1 = df_occ_2.join(distinct_counts,"specieskey"). | |
filter($"count" >= 10). | |
select("specieskey","xx"). | |
withColumn("x1", lead("xx", 1) over windowSpec). | |
orderBy("specieskey","xx") | |
val df_constants = df_1. | |
groupBy("specieskey"). | |
agg( | |
mean(col("xx")).alias("mx"), | |
count(lit(1)).alias("n")). | |
withColumn("n1",$"n" - 1). | |
withColumn("t1",(lit(0.95) * sqrt($"n")) + 0.2) | |
val df_2 = df_1.join(df_constants,"specieskey"). | |
withColumn("y", | |
when(col("xx") < col("mx"), | |
($"x1" - $"xx") * ($"mx" - $"xx")). | |
otherwise( | |
($"x1" - $"xx") * ($"x1" - $"mx")) | |
) | |
val df_my = df_2. | |
groupBy("specieskey"). | |
agg(mean($"y").alias("my")) | |
val df_3 = df_2.join(df_my,"specieskey") | |
val df_z = df_3. | |
withColumn("diff",$"y" - $"my"). | |
withColumn("pow",pow($"diff",lit(2))). | |
groupBy("specieskey","xx","y","n1"). | |
agg(sum($"pow").alias("sum")). | |
withColumn("z",$"y"/(sqrt($"sum")/$"n1")). | |
withColumnRenamed("specieskey","specieskey_z"). | |
withColumnRenamed("xx","xx_z"). | |
select("specieskey_z","xx_z","z") | |
val df_4 = df_3.join(df_z, | |
df_z("specieskey_z") === df_3("specieskey") && | |
df_z("xx_z") === df_3("xx")). | |
drop("xx_z"). | |
select("specieskey","xx","t1","z"). | |
withColumn("v", | |
when(col("z") > col("t1"),$"xx"). | |
otherwise(null) | |
) | |
val df_quantiles = df_4.groupBy($"specieskey"). | |
agg(percentile_approx($"xx", typedLit(Seq(0.25,0.5,0.75))).as("quantiles")). | |
select($"*", | |
$"quantiles".getItem(0).as("Q1"), | |
$"quantiles".getItem(1).as("median_x"), | |
$"quantiles".getItem(2).as("Q3") | |
). | |
select("specieskey","median_x") | |
val df_5 = df_4.join(df_quantiles,"specieskey"). | |
withColumn("greater",$"v" > $"median_x"). | |
withColumn("less",$"v" < $"median_x"). | |
withColumn("outlier_greater", ($"greater" && $"xx" >= $"v") ). | |
withColumn("outlier_lesser", ($"less" && $"xx" <= $"v") ). | |
orderBy("specieskey","xx") | |
val df_lower = df_5. | |
filter($"outlier_lesser"). | |
groupBy("specieskey"). | |
agg(min("xx").alias("min_xx")) | |
val df_upper = df_5. | |
filter($"outlier_greater"). | |
groupBy("specieskey"). | |
agg(max("xx").alias("max_xx")) | |
val df_outliers_upper = df_5.join(df_upper,"specieskey"). | |
filter($"xx" >= $"max_xx"). | |
withColumn("outlier_type",lit("upper")). | |
select("specieskey","xx","outlier_type") | |
val df_outliers_lower = df_5.join(df_lower,"specieskey"). | |
filter($"xx" <= $"min_xx"). | |
withColumn("outlier_type",lit("lower")). | |
select("specieskey","xx","outlier_type") | |
val outliers = Seq(df_outliers_upper,df_outliers_lower). | |
reduce(_ union _). | |
orderBy("specieskey"). | |
withColumn("focal_bio_var",lit(xx)) | |
// save to disk results | |
outliers. | |
write.format("csv"). | |
option("header","true"). | |
option("sep", "\t"). | |
mode(SaveMode.Append). | |
save("rjack_temp") | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment