Skip to content

Instantly share code, notes, and snippets.

@timrobertson100
Last active February 19, 2020 15:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save timrobertson100/09fffec8799f734693c1d90c213b76f7 to your computer and use it in GitHub Desktop.
Save timrobertson100/09fffec8799f734693c1d90c213b76f7 to your computer and use it in GitHub Desktop.
A quick test to explore a bloodhound process

This is a quick test of a modified version of the Bloodhound spark script to check it runs on the GBIF Cloudera cluster (CDH 5.16.2).

From the gateway, grab the file from HDFS (skip HTTP for speed), unzip (15-20 mins) and upload to HDFS:

hdfs dfs -getmerge /occurrence-download/prod-downloads/0002504-181003121212138.zip /mnt/auto/misc/bloodhound/data.zip
unzip /mnt/auto/misc/bloodhound/data.zip -d /mnt/auto/misc/bloodhound/data

hdfs dfs -rm /tmp/verbatim.txt
hdfs dfs -rm /tmp/occurrence.txt

hdfs dfs -put /mnt/auto/misc/bloodhound/data/verbatim.txt /tmp/verbatim.txt
hdfs dfs -put /mnt/auto/misc/bloodhound/data/occurrence.txt /tmp/occurrence.txt

Note: There are options GBIF could explore do to avoid the above by moving processing into the download Oozie job.

Launch a spark cluster (using shell here but would be a scripted):

spark2-shell --master yarn --num-executors 20  --driver-memory 4g --executor-memory 12g

Run the process. Here I skip the MySQL load as a first test, which could be avoided by producing a file to subsequently load in to MySQL. Script runs in X mins.

import sys.process._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val verbatimTerms = List(
  "gbifID",
  "occurrenceID",
  "dateIdentified",
  "decimalLatitude",
  "decimalLongitude",
  "country",
  "eventDate",
  "year",
  "family",
  "identifiedBy",
  "institutionCode",
  "collectionCode",
  "catalogNumber",
  "recordedBy",
  "scientificName",
  "typeStatus"
)

//load a big, verbatim tsv file from a DwC-A download
val df1 = spark.
    read.
    format("csv").
    option("header", "true").
    option("mode", "DROPMALFORMED").
    option("delimiter", "\t").
    option("quote", "\"").
    option("escape", "\"").
    option("treatEmptyValuesAsNulls", "true").
    option("ignoreLeadingWhiteSpace", "true").
    load("/tmp//verbatim.txt").
    select(verbatimTerms.map(col): _*).
    filter(coalesce($"identifiedBy",$"recordedBy").isNotNull).
    where(!$"scientificName".contains("BOLD:")).
    where(!$"scientificName".contains("BOLD-")).
    where(!$"scientificName".contains("BIOUG"))

//optionally save the DataFrame to disk so we don't have to do the above again
df1.write.mode("overwrite").parquet("/tmp/tmp_verbatim")

//load the saved DataFrame, can later skip the above processes and start from here
val df1 = spark.
    read.
    parquet("/tmp/tmp_verbatim")

val processedTerms = List(
  "gbifID",
  "datasetKey",
  "countryCode",
  "dateIdentified",
  "eventDate"
)

val df2 = spark.
    read.
    format("csv").
    option("header", "true").
    option("mode", "DROPMALFORMED").
    option("delimiter", "\t").
    option("quote", "\"").
    option("escape", "\"").
    option("treatEmptyValuesAsNulls", "true").
    option("ignoreLeadingWhiteSpace", "true").
    load("/tmp/occurrence.txt").
    select(processedTerms.map(col): _*).
    filter(coalesce($"countryCode",$"dateIdentified",$"eventDate").isNotNull)

//optionally save the DataFrame to disk so we don't have to do the above again
df2.withColumnRenamed("dateIdentified","dateIdentified_processed").
    withColumnRenamed("eventDate", "eventDate_processed").
    write.mode("overwrite").parquet("/tmp/tmp_processed")

//load the saved DataFrame, can later skip the above processes and start from here
val df2 = spark.
    read.
    parquet("/tmp/tmp_processed").
    withColumn("eventDate_processed", to_timestamp($"eventDate_processed")).
    withColumn("dateIdentified_processed", to_timestamp($"dateIdentified_processed"))

val occurrences = df1.join(df2, Seq("gbifID"), "leftouter").orderBy($"gbifID").distinct

//set some properties for a MySQL connection
val prop = new java.util.Properties
prop.setProperty("driver", "com.mysql.cj.jdbc.Driver")
prop.setProperty("user", "root")
prop.setProperty("password", "")

val url = "jdbc:mysql://localhost:3306/bloodhound?serverTimezone=UTC&useSSL=false"

// Best to drop indices then recreate later
// ALTER TABLE `occurrences` DROP KEY `typeStatus_idx`, DROP KEY `index_occurrences_on_datasetKey`;

// SKIP DB writing

//write occurrences data to the database
// occurrences.write.mode("append").jdbc(url, "occurrences", prop)

// Recreate indices
// ALTER TABLE `occurrences` ADD KEY `typeStatus_idx` (`typeStatus`(256)), ADD KEY `index_occurrences_on_datasetKey` (`datasetKey`);

//aggregate recordedBy
val recordedByGroups = occurrences.
    filter($"recordedBy".isNotNull).
    groupBy($"recordedBy" as "agents").
    agg(collect_set($"gbifID") as "gbifIDs_recordedBy")

//aggregate identifiedBy
val identifiedByGroups = occurrences.
    filter($"identifiedBy".isNotNull).
    groupBy($"identifiedBy" as "agents").
    agg(collect_set($"gbifID") as "gbifIDs_identifiedBy")

//union identifiedBy and recordedBy entries
val unioned = spark.
    read.
    json(recordedByGroups.toJSON.union(identifiedByGroups.toJSON))

//concatenate arrays into strings
def stringify(c: Column) = concat(lit("["), concat_ws(",", c), lit("]"))

//write aggregated agents to csv files for the Populate Agents script, /bin/populate_agents.rb
unioned.select("agents", "gbifIDs_recordedBy", "gbifIDs_identifiedBy").
    withColumn("gbifIDs_recordedBy", stringify($"gbifIDs_recordedBy")).
    withColumn("gbifIDs_identifiedBy", stringify($"gbifIDs_identifiedBy")).
    write.
    mode("overwrite").
    option("header", "true").
    option("quote", "\"").
    option("escape", "\"").
    csv("agents-unioned-csv")

// Best to drop indices then recreate later, after all jobs are complete
// ALTER TABLE `occurrence_determiners` DROP KEY `agent_idx`, DROP KEY `occurrence_idx`;
// ALTER TABLE `occurrence_recorders` DROP KEY `agent_idx`, DROP KEY `occurrence_idx`;

// Recreate indices
// ALTER TABLE `occurrence_determiners` ADD KEY `agent_idx` (`agent_id`), ADD KEY `occurrence_idx` (`occurrence_id`);
// ALTER TABLE `occurrence_recorders` ADD KEY `agent_idx` (`agent_id`), ADD KEY `occurrence_idx` (`occurrence_id`);

//aggregate families (Taxa)
val familyGroups = occurrences.
    filter($"family".isNotNull).
    groupBy($"family").
    agg(collect_set($"gbifID") as "gbifIDs_family")

//write aggregated Families to csv files for the Populate Taxa script, /bin/populate_taxa.rb
familyGroups.select("family", "gbifIDs_family").
    withColumn("gbifIDs_family", stringify($"gbifIDs_family")).
    write.
    mode("overwrite").
    option("header", "true").
    option("quote", "\"").
    option("escape", "\"").
    csv("/tmp/family.csv")
    
    

import sys.process._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val verbatimTerms = List(
  "gbifID",
  "occurrenceID",
  "dateIdentified",
  "decimalLatitude",
  "decimalLongitude",
  "country",
  "eventDate",
  "year",
  "family",
  "identifiedBy",
  "institutionCode",
  "collectionCode",
  "catalogNumber",
  "recordedBy",
  "scientificName",
  "typeStatus"
)

//load a big, verbatim tsv file from a DwC-A download
val df1 = spark.
    read.
    format("csv").
    option("header", "true").
    option("mode", "DROPMALFORMED").
    option("delimiter", "\t").
    option("quote", "\"").
    option("escape", "\"").
    option("treatEmptyValuesAsNulls", "true").
    option("ignoreLeadingWhiteSpace", "true").
    load("/tmp/verbatim.txt").
    select(verbatimTerms.map(col): _*).
    filter(coalesce($"identifiedBy",$"recordedBy").isNotNull).
    where(!$"scientificName".contains("BOLD:")).
    where(!$"scientificName".contains("BOLD-")).
    where(!$"scientificName".contains("BIOUG"))
    
val processedTerms = List(
  "gbifID",
  "datasetKey",
  "countryCode",
  "dateIdentified",
  "eventDate"
)

val df2 = spark.
    read.
    format("csv").
    option("header", "true").
    option("mode", "DROPMALFORMED").
    option("delimiter", "\t").
    option("quote", "\"").
    option("escape", "\"").
    option("treatEmptyValuesAsNulls", "true").
    option("ignoreLeadingWhiteSpace", "true").
    load("/tmp/occurrence.txt").
    select(processedTerms.map(col): _*).
    filter(coalesce($"countryCode",$"dateIdentified",$"eventDate").isNotNull)    
    
    
    
familyGroups.select("family", "gbifIDs_family").
    withColumn("gbifIDs_family", stringify($"gbifIDs_family")).
    write.
    mode("overwrite").
    option("header", "true").
    option("quote", "\"").
    option("escape", "\"").
    csv("/tmp/family.csv")    

Get the output from HDFS and check it looks like it has content

hdfs dfs -getmerge /tmp/family.csv /tmp/family.csv
head family.csv
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment