Skip to content

Instantly share code, notes, and snippets.

@dshorthouse
Last active May 28, 2020 16:57
Show Gist options
  • Save dshorthouse/1a2ef2616a4f1f395013abf866fcc87d to your computer and use it in GitHub Desktop.
Save dshorthouse/1a2ef2616a4f1f395013abf866fcc87d to your computer and use it in GitHub Desktop.

Apache Spark in the Playground

"My data files are too large."

"I have many different files and I need to join them together."

Why/What Is Apache Spark?

https://spark.apache.org/

  • a fast and general-purpose cluster computing system
  • middle-, computational-, sandwich-layer between large/many source data files and (scripted) programming languages
  • system APIs in Java, Scala, Python and R
  • bundled libraries include SQL and DataFrames, Streaming, MLlib (machine learning), GraphX (graph structures)

Installing Apache Spark

Homebrew on a Mac

 $ brew install apache-spark

File Formats

  • Big Data file formats:
    • carry the data schema in the files (self-described)
    • "on-the-wire" – can pass between machines
    • avro: row-level compression, good for writes
    • parquet: column-level compression, good for reads

Playground Sand

Using Apache Spark

  • querying in "SQL"

  • combining disparate datasets as dataframes

  • saving and reading avro files

  • saving as csv files

    $ spark-shell --driver-memory 12G
    

Kill it with crtl-c

Drawing-in jar files (eg MySQL connector) and packages (eg Avro) on load

 $ spark-shell --jars /usr/local/opt/mysql-connector-java/libexec/mysql-connector-java-8.0.19.jar --packages org.apache.spark:spark-avro_2.11:2.4.5 --driver-memory 12G

 scala> import sys.process._
        import org.apache.spark.sql.Column
        import org.apache.spark.sql.types._
        import org.apache.spark.sql.functions._
        import org.apache.spark.sql.avro._

Declare variable and read a tsv

 scala> val df1 = spark.
              read.
              format("csv").
              option("header", "true").
              option("mode", "DROPMALFORMED").
              option("delimiter", "\t").
              option("quote", "\"").
              option("escape", "\"").
              option("treatEmptyValuesAsNulls", "true").
              option("ignoreLeadingWhiteSpace", "true").
              load("/Users/dshorthouse/Downloads/GBIF/0072934-200221144449610/verbatim.txt")

Show the column names

 scala> df1.columns

Show all the sorted column names in stdout

 scala> df1.columns.sorted.foreach(println)

Count the rows

 scala> df1.count

Show first 100 species (and don't truncate column)

 scala> df1.
          select($"scientificName").
          distinct.
          orderBy($"scientificName").
          show(100, false)

Show some habitats, skip null cells (default truncate column is ok)

 scala> df1.
          select($"habitat").
          filter($"habitat".isNotNull).
          distinct.
          show(100)

Make a list of columns we want to select

 scala> val verbatimTerms = List(
              "gbifID",
              "occurrenceID",
              "decimalLatitude",
              "decimalLongitude",
              "country",
              "institutionCode",
              "collectionCode",
              "catalogNumber",
              "scientificName",
              "habitat"
            )

Use map to select on those columns & roll it together, save as Avro in a folder called "verbatim"

 scala> df1.
          select(verbatimTerms.map(col): _*).
          filter($"habitat".isNotNull).
          write.
          mode("overwrite").
          format("avro").
          save("verbatim")

Exit from Spark, launch it again and then load that Avro output

 scala> val df1 = spark.
              read.
              format("avro").
              load("verbatim")

Load another tsv file and use few columns

 scala> val processedTerms = List(
              "gbifID",
              "datasetKey",
              "mediaType"
           )

 scala> val df2 = spark.
               read.
               format("csv").
               option("header", "true").
               option("mode", "DROPMALFORMED").
               option("delimiter", "\t").
               option("quote", "\"").
               option("escape", "\"").
               option("treatEmptyValuesAsNulls", "true").
               option("ignoreLeadingWhiteSpace", "true").
               load("/Users/dshorthouse/Downloads/GBIF/0072934-200221144449610/occurrence.txt").
               select(processedTerms.map(col): _*).
               withColumnRenamed("mediaType", "hasImage").
               withColumn("hasImage", when($"hasImage".contains("StillImage"), 1).otherwise(0))

Let's join them!

 scala> val occurrences = df1.
              join(df2, Seq("gbifID"), "leftouter").orderBy($"gbifID").
              distinct

Show the columns like before

 scala> occurrences.columns.sorted.foreach(println)

How many records?

 scala> occurrences.count

Write the joined data to a single csv file

 scala> occurrences.
          repartition(1).
          write.
          mode("overwrite").
          option("header", "true").
          option("quote", "\"").
          option("escape", "\"").
          csv("occurrences")

Or, we can write to MySQL

 scala> val prop = new java.util.Properties
           prop.setProperty("driver", "com.mysql.cj.jdbc.Driver")
           prop.setProperty("user", "root")
           prop.setProperty("password", "12345")

 scala> val url = "jdbc:mysql://localhost:3306/database_name?serverTimezone=UTC&useSSL=false"
 scala> occurrences.write.mode("append").jdbc(url, "table_name", prop)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment