Skip to content

Instantly share code, notes, and snippets.

@jhnwllr
Created August 27, 2020 08:36
Show Gist options
  • Save jhnwllr/9abb7225b88132e70286b7beea7dd44a to your computer and use it in GitHub Desktop.
Save jhnwllr/9abb7225b88132e70286b7beea7dd44a to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.functions.lit
import org.apache.spark.SparkContext
import org.apache.spark.mllib.random.RandomRDDs._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.sqrt
import org.apache.spark.sql.expressions.Window
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
object PercentileApprox {
def percentile_approx(col: Column, percentage: Column, accuracy: Column): Column = {
val expr = new ApproximatePercentile(
col.expr, percentage.expr, accuracy.expr
).toAggregateExpression
new Column(expr)
}
def percentile_approx(col: Column, percentage: Column): Column = percentile_approx(
col, percentage, lit(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY)
)
}
import PercentileApprox._
// import bioclim data
val table_name = "bioclim_0.1_extract.tsv"
// val database = "prod_h"
val database = "uat"
// read in bioclim table
val df_bioclim = spark.read.
option("sep", "\t").
option("header", "true").
option("inferSchema", "true").
csv(table_name).
withColumn("decimallatitude_bioclim",$"decimallatitude").
withColumn("decimallongitude_bioclim",$"decimallongitude").
drop("decimallatitude").
drop("decimallongitude")
val df_original = sqlContext.sql("SELECT * FROM " + database + ".occurrence").
filter($"hasgeospatialissues" === false).
filter($"basisofrecord" =!= "FOSSIL_SPECIMEN").
filter($"basisofrecord" =!= "UNKNOWN").
filter($"basisofrecord" =!= "LIVING_SPECIMEN").
filter($"kingdomkey" === 1 || $"kingdomkey" === 6 || $"kingdomkey" === 5).
filter($"decimallatitude".isNotNull).
filter($"decimallongitude".isNotNull).
withColumn("rounded_decimallatitude", round(col("decimallatitude") * 100 / 10) * 10 / 100).
withColumn("rounded_decimallongitude", round(col("decimallongitude") * 100 / 10) * 10 / 100).
select(
"gbifid",
"datasetkey",
"basisofrecord",
"specieskey",
"genuskey",
"familykey",
"phylumkey",
"classkey",
"kingdomkey",
"species",
"genus",
"family",
"phylum",
"class",
"kingdom",
"decimallatitude",
"decimallongitude",
"rounded_decimallatitude",
"rounded_decimallongitude",
"coordinateuncertaintyinmeters"
)
// get distinct coordinates
val df_occ_1 = df_original.
select("specieskey","rounded_decimallatitude","rounded_decimallongitude").
distinct()
val bio_vars = df_bioclim.
drop("decimallatitude_bioclim").
drop("decimallongitude_bioclim").
columns.toSeq.toArray
// delete temp saving temp table at the end
import org.apache.spark.sql.SaveMode
import sys.process._
(s"hdfs dfs -rm -r rjack_temp")!
// slice(0,3).
// run all bioclim variables and pca variables
val df_bioclim_join = df_occ_1.join(df_bioclim,
df_bioclim("decimallatitude_bioclim") === df_occ_1("rounded_decimallatitude") &&
df_bioclim("decimallongitude_bioclim") === df_occ_1("rounded_decimallongitude"))
bio_vars.
map(xx => {
println(xx)
val df_occ_2 = df_bioclim_join.
withColumn("xx",col(xx)).
select("specieskey","xx").
distinct()
val distinct_counts = df_occ_2.
groupBy("specieskey").
count()
// start reverse jackknife part //
// create lagged data
val windowSpec = Window.partitionBy("specieskey").orderBy($"xx")
val df_1 = df_occ_2.join(distinct_counts,"specieskey").
filter($"count" >= 10).
select("specieskey","xx").
withColumn("x1", lead("xx", 1) over windowSpec).
orderBy("specieskey","xx")
val df_constants = df_1.
groupBy("specieskey").
agg(
mean(col("xx")).alias("mx"),
count(lit(1)).alias("n")).
withColumn("n1",$"n" - 1).
withColumn("t1",(lit(0.95) * sqrt($"n")) + 0.2)
val df_2 = df_1.join(df_constants,"specieskey").
withColumn("y",
when(col("xx") < col("mx"),
($"x1" - $"xx") * ($"mx" - $"xx")).
otherwise(
($"x1" - $"xx") * ($"x1" - $"mx"))
)
val df_my = df_2.
groupBy("specieskey").
agg(mean($"y").alias("my"))
val df_3 = df_2.join(df_my,"specieskey")
val df_z = df_3.
withColumn("diff",$"y" - $"my").
withColumn("pow",pow($"diff",lit(2))).
groupBy("specieskey","xx","y","n1").
agg(sum($"pow").alias("sum")).
withColumn("z",$"y"/(sqrt($"sum")/$"n1")).
withColumnRenamed("specieskey","specieskey_z").
withColumnRenamed("xx","xx_z").
select("specieskey_z","xx_z","z")
val df_4 = df_3.join(df_z,
df_z("specieskey_z") === df_3("specieskey") &&
df_z("xx_z") === df_3("xx")).
drop("xx_z").
select("specieskey","xx","t1","z").
withColumn("v",
when(col("z") > col("t1"),$"xx").
otherwise(null)
)
val df_quantiles = df_4.groupBy($"specieskey").
agg(percentile_approx($"xx", typedLit(Seq(0.25,0.5,0.75))).as("quantiles")).
select($"*",
$"quantiles".getItem(0).as("Q1"),
$"quantiles".getItem(1).as("median_x"),
$"quantiles".getItem(2).as("Q3")
).
select("specieskey","median_x")
val df_5 = df_4.join(df_quantiles,"specieskey").
withColumn("greater",$"v" > $"median_x").
withColumn("less",$"v" < $"median_x").
withColumn("outlier_greater", ($"greater" && $"xx" >= $"v") ).
withColumn("outlier_lesser", ($"less" && $"xx" <= $"v") ).
orderBy("specieskey","xx")
val df_lower = df_5.
filter($"outlier_lesser").
groupBy("specieskey").
agg(min("xx").alias("min_xx"))
val df_upper = df_5.
filter($"outlier_greater").
groupBy("specieskey").
agg(max("xx").alias("max_xx"))
val df_outliers_upper = df_5.join(df_upper,"specieskey").
filter($"xx" >= $"max_xx").
withColumn("outlier_type",lit("upper")).
select("specieskey","xx","outlier_type")
val df_outliers_lower = df_5.join(df_lower,"specieskey").
filter($"xx" <= $"min_xx").
withColumn("outlier_type",lit("lower")).
select("specieskey","xx","outlier_type")
val outliers = Seq(df_outliers_upper,df_outliers_lower).
reduce(_ union _).
orderBy("specieskey").
withColumn("focal_bio_var",lit(xx))
// save to disk results
outliers.
write.format("csv").
option("header","true").
option("sep", "\t").
mode(SaveMode.Append).
save("rjack_temp")
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment