Skip to content

Instantly share code, notes, and snippets.

@dshorthouse
Forked from timrobertson100/bloodhound.md
Last active February 20, 2020 16:40
Show Gist options
  • Save dshorthouse/bfcee0181621cbbd5bd17147022e712e to your computer and use it in GitHub Desktop.
Save dshorthouse/bfcee0181621cbbd5bd17147022e712e to your computer and use it in GitHub Desktop.
A quick test to explore a bloodhound process

This is a quick test of a modified version of the Bloodhound spark script to check it runs on the GBIF Cloudera cluster (CDH 5.16.2).

From the gateway, grab the file from HDFS (skip HTTP for speed), unzip (15-20 mins) and upload to HDFS:

hdfs dfs -getmerge /occurrence-download/prod-downloads/0002504-181003121212138.zip /mnt/auto/misc/bloodhound/data.zip
unzip /mnt/auto/misc/bloodhound/data.zip -d /mnt/auto/misc/bloodhound/data

hdfs dfs -rm /tmp/verbatim.txt
hdfs dfs -rm /tmp/occurrence.txt

hdfs dfs -put /mnt/auto/misc/bloodhound/data/verbatim.txt /tmp/verbatim.txt
hdfs dfs -put /mnt/auto/misc/bloodhound/data/occurrence.txt /tmp/occurrence.txt

Note: There are options GBIF could explore do to avoid the above by moving processing into the download Oozie job.

Launch a spark cluster (using shell here but would be a scripted):

spark2-shell --master yarn --num-executors 20  --driver-memory 4g --executor-memory 12g

Run the process. Here I skip the MySQL load as a first test, which could be avoided by producing a file to subsequently load in to MySQL. Script runs in X mins.

import sys.process._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val verbatimTerms = List(
  "gbifID",
  "occurrenceID",
  "dateIdentified",
  "decimalLatitude",
  "decimalLongitude",
  "country",
  "eventDate",
  "year",
  "family",
  "identifiedBy",
  "institutionCode",
  "collectionCode",
  "catalogNumber",
  "recordedBy",
  "scientificName",
  "typeStatus"
)

//load a big, verbatim tsv file from a DwC-A download
val df1 = spark.
    read.
    format("csv").
    option("header", "true").
    option("mode", "DROPMALFORMED").
    option("delimiter", "\t").
    option("quote", "\"").
    option("escape", "\"").
    option("treatEmptyValuesAsNulls", "true").
    option("ignoreLeadingWhiteSpace", "true").
    load("/tmp/verbatim.txt").
    select(verbatimTerms.map(col): _*).
    filter(coalesce($"identifiedBy",$"recordedBy").isNotNull).
    where(!$"scientificName".contains("BOLD:")).
    where(!$"scientificName".contains("BOLD-")).
    where(!$"scientificName".contains("BIOUG"))

val processedTerms = List(
  "gbifID",
  "datasetKey",
  "countryCode",
  "dateIdentified",
  "eventDate"
)

val df2 = spark.
    read.
    format("csv").
    option("header", "true").
    option("mode", "DROPMALFORMED").
    option("delimiter", "\t").
    option("quote", "\"").
    option("escape", "\"").
    option("treatEmptyValuesAsNulls", "true").
    option("ignoreLeadingWhiteSpace", "true").
    load("/tmp/occurrence.txt").
    select(processedTerms.map(col): _*).
    filter(coalesce($"countryCode",$"dateIdentified",$"eventDate").isNotNull).
    withColumnRenamed("dateIdentified","dateIdentified_processed").
    withColumnRenamed("eventDate", "eventDate_processed")

val occurrences = df1.join(df2, Seq("gbifID"), "leftouter").orderBy($"gbifID").distinct

occurrences.
    write.
    mode("overwrite").
    parquet("/tmp/bloodhound-occurrences")

//aggregate recordedBy
val recordedByGroups = occurrences.
    filter($"recordedBy".isNotNull).
    groupBy($"recordedBy" as "agents").
    agg(collect_set($"gbifID") as "gbifIDs_recordedBy")

//aggregate identifiedBy
val identifiedByGroups = occurrences.
    filter($"identifiedBy".isNotNull).
    groupBy($"identifiedBy" as "agents").
    agg(collect_set($"gbifID") as "gbifIDs_identifiedBy")

//union identifiedBy and recordedBy entries
val unioned = spark.
    read.
    json(recordedByGroups.toJSON.union(identifiedByGroups.toJSON))

//concatenate arrays into strings
def stringify(c: Column) = concat(lit("["), concat_ws(",", c), lit("]"))

unioned.select("agents", "gbifIDs_recordedBy", "gbifIDs_identifiedBy").
    withColumn("gbifIDs_recordedBy", stringify($"gbifIDs_recordedBy")).
    withColumn("gbifIDs_identifiedBy", stringify($"gbifIDs_identifiedBy")).
    write.
    mode("overwrite").
    option("header", "true").
    option("quote", "\"").
    option("escape", "\"").
    csv("/tmp/bloodhound-agents-unioned")

//aggregate families (Taxa)
val familyGroups = occurrences.
    filter($"family".isNotNull).
    groupBy($"family").
    agg(collect_set($"gbifID") as "gbifIDs_family")

familyGroups.select("family", "gbifIDs_family").
    withColumn("gbifIDs_family", stringify($"gbifIDs_family")).
    write.
    mode("overwrite").
    option("header", "true").
    option("quote", "\"").
    option("escape", "\"").
    csv("/tmp/bloodhound-family")   
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment