"My data files are too large."
"I have many different files and I need to join them together."
- a fast and general-purpose cluster computing system
- middle-, computational-, sandwich-layer between large/many source data files and (scripted) programming languages
- system APIs in Java, Scala, Python and R
- bundled libraries include SQL and DataFrames, Streaming, MLlib (machine learning), GraphX (graph structures)
$ brew install apache-spark
- Big Data file formats:
- carry the data schema in the files (self-described)
- "on-the-wire" – can pass between machines
- avro: row-level compression, good for writes
- parquet: column-level compression, good for reads
- Brassicaceae specimen records aggregated on GBIF, https://gbif.org
- https://doi.org/10.15468/dl.qfzuwz
- 1.7M occurrence records
-
querying in "SQL"
-
combining disparate datasets as dataframes
-
saving and reading avro files
-
saving as csv files
$ spark-shell --driver-memory 12G
Kill it with crtl-c
Drawing-in jar files (eg MySQL connector) and packages (eg Avro) on load
$ spark-shell --jars /usr/local/opt/mysql-connector-java/libexec/mysql-connector-java-8.0.19.jar --packages org.apache.spark:spark-avro_2.11:2.4.5 --driver-memory 12G
scala> import sys.process._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro._
Declare variable and read a tsv
scala> val df1 = spark.
read.
format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
option("delimiter", "\t").
option("quote", "\"").
option("escape", "\"").
option("treatEmptyValuesAsNulls", "true").
option("ignoreLeadingWhiteSpace", "true").
load("/Users/dshorthouse/Downloads/GBIF/0072934-200221144449610/verbatim.txt")
Show the column names
scala> df1.columns
Show all the sorted column names in stdout
scala> df1.columns.sorted.foreach(println)
Count the rows
scala> df1.count
Show first 100 species (and don't truncate column)
scala> df1.
select($"scientificName").
distinct.
orderBy($"scientificName").
show(100, false)
Show some habitats, skip null cells (default truncate column is ok)
scala> df1.
select($"habitat").
filter($"habitat".isNotNull).
distinct.
show(100)
Make a list of columns we want to select
scala> val verbatimTerms = List(
"gbifID",
"occurrenceID",
"decimalLatitude",
"decimalLongitude",
"country",
"institutionCode",
"collectionCode",
"catalogNumber",
"scientificName",
"habitat"
)
Use map to select on those columns & roll it together, save as Avro in a folder called "verbatim"
scala> df1.
select(verbatimTerms.map(col): _*).
filter($"habitat".isNotNull).
write.
mode("overwrite").
format("avro").
save("verbatim")
Exit from Spark, launch it again and then load that Avro output
scala> val df1 = spark.
read.
format("avro").
load("verbatim")
Load another tsv file and use few columns
scala> val processedTerms = List(
"gbifID",
"datasetKey",
"mediaType"
)
scala> val df2 = spark.
read.
format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
option("delimiter", "\t").
option("quote", "\"").
option("escape", "\"").
option("treatEmptyValuesAsNulls", "true").
option("ignoreLeadingWhiteSpace", "true").
load("/Users/dshorthouse/Downloads/GBIF/0072934-200221144449610/occurrence.txt").
select(processedTerms.map(col): _*).
withColumnRenamed("mediaType", "hasImage").
withColumn("hasImage", when($"hasImage".contains("StillImage"), 1).otherwise(0))
Let's join them!
scala> val occurrences = df1.
join(df2, Seq("gbifID"), "leftouter").orderBy($"gbifID").
distinct
Show the columns like before
scala> occurrences.columns.sorted.foreach(println)
How many records?
scala> occurrences.count
Write the joined data to a single csv file
scala> occurrences.
repartition(1).
write.
mode("overwrite").
option("header", "true").
option("quote", "\"").
option("escape", "\"").
csv("occurrences")
Or, we can write to MySQL
scala> val prop = new java.util.Properties
prop.setProperty("driver", "com.mysql.cj.jdbc.Driver")
prop.setProperty("user", "root")
prop.setProperty("password", "12345")
scala> val url = "jdbc:mysql://localhost:3306/database_name?serverTimezone=UTC&useSSL=false"
scala> occurrences.write.mode("append").jdbc(url, "table_name", prop)