This is a quick test of a modified version of the Bloodhound spark script to check it runs on the GBIF Cloudera cluster (CDH 5.16.2).
From the gateway, grab the file from HDFS (skip HTTP for speed), unzip (15-20 mins) and upload to HDFS:
hdfs dfs -getmerge /occurrence-download/prod-downloads/0002504-181003121212138.zip /mnt/auto/misc/bloodhound/data.zip
unzip /mnt/auto/misc/bloodhound/data.zip -d /mnt/auto/misc/bloodhound/data
hdfs dfs -rm /tmp/verbatim.txt
hdfs dfs -rm /tmp/occurrence.txt
hdfs dfs -put /mnt/auto/misc/bloodhound/data/verbatim.txt /tmp/verbatim.txt
hdfs dfs -put /mnt/auto/misc/bloodhound/data/occurrence.txt /tmp/occurrence.txt
Note: There are options GBIF could explore do to avoid the above by moving processing into the download Oozie job.
Launch a spark cluster (using shell here but would be a scripted):
spark2-shell --master yarn --num-executors 20 --driver-memory 4g --executor-memory 12g
Run the process. Here I skip the MySQL load as a first test, which could be avoided by producing a file to subsequently load in to MySQL. Script runs in X mins.
import sys.process._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val verbatimTerms = List(
"gbifID",
"occurrenceID",
"dateIdentified",
"decimalLatitude",
"decimalLongitude",
"country",
"eventDate",
"year",
"family",
"identifiedBy",
"institutionCode",
"collectionCode",
"catalogNumber",
"recordedBy",
"scientificName",
"typeStatus"
)
//load a big, verbatim tsv file from a DwC-A download
val df1 = spark.
read.
format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
option("delimiter", "\t").
option("quote", "\"").
option("escape", "\"").
option("treatEmptyValuesAsNulls", "true").
option("ignoreLeadingWhiteSpace", "true").
load("/tmp/verbatim.txt").
select(verbatimTerms.map(col): _*).
filter(coalesce($"identifiedBy",$"recordedBy").isNotNull).
where(!$"scientificName".contains("BOLD:")).
where(!$"scientificName".contains("BOLD-")).
where(!$"scientificName".contains("BIOUG"))
val processedTerms = List(
"gbifID",
"datasetKey",
"countryCode",
"dateIdentified",
"eventDate"
)
val df2 = spark.
read.
format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
option("delimiter", "\t").
option("quote", "\"").
option("escape", "\"").
option("treatEmptyValuesAsNulls", "true").
option("ignoreLeadingWhiteSpace", "true").
load("/tmp/occurrence.txt").
select(processedTerms.map(col): _*).
filter(coalesce($"countryCode",$"dateIdentified",$"eventDate").isNotNull).
withColumnRenamed("dateIdentified","dateIdentified_processed").
withColumnRenamed("eventDate", "eventDate_processed")
val occurrences = df1.join(df2, Seq("gbifID"), "leftouter").orderBy($"gbifID").distinct
occurrences.
write.
mode("overwrite").
parquet("/tmp/bloodhound-occurrences")
//aggregate recordedBy
val recordedByGroups = occurrences.
filter($"recordedBy".isNotNull).
groupBy($"recordedBy" as "agents").
agg(collect_set($"gbifID") as "gbifIDs_recordedBy")
//aggregate identifiedBy
val identifiedByGroups = occurrences.
filter($"identifiedBy".isNotNull).
groupBy($"identifiedBy" as "agents").
agg(collect_set($"gbifID") as "gbifIDs_identifiedBy")
//union identifiedBy and recordedBy entries
val unioned = spark.
read.
json(recordedByGroups.toJSON.union(identifiedByGroups.toJSON))
//concatenate arrays into strings
def stringify(c: Column) = concat(lit("["), concat_ws(",", c), lit("]"))
unioned.select("agents", "gbifIDs_recordedBy", "gbifIDs_identifiedBy").
withColumn("gbifIDs_recordedBy", stringify($"gbifIDs_recordedBy")).
withColumn("gbifIDs_identifiedBy", stringify($"gbifIDs_identifiedBy")).
write.
mode("overwrite").
option("header", "true").
option("quote", "\"").
option("escape", "\"").
csv("/tmp/bloodhound-agents-unioned")
//aggregate families (Taxa)
val familyGroups = occurrences.
filter($"family".isNotNull).
groupBy($"family").
agg(collect_set($"gbifID") as "gbifIDs_family")
familyGroups.select("family", "gbifIDs_family").
withColumn("gbifIDs_family", stringify($"gbifIDs_family")).
write.
mode("overwrite").
option("header", "true").
option("quote", "\"").
option("escape", "\"").
csv("/tmp/bloodhound-family")