Skip to content

Instantly share code, notes, and snippets.

@fmendezh
Created October 12, 2016 08:58
Show Gist options
  • Save fmendezh/69fe53b0aff3dd0b255f52eefd549dd2 to your computer and use it in GitHub Desktop.
Save fmendezh/69fe53b0aff3dd0b255f52eefd549dd2 to your computer and use it in GitHub Desktop.
Spark REPL Test
name := "spark-validation"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.10" % "1.6.1" exclude ("org.apache.hadoop","hadoop-yarn-server-web-proxy"),
"org.apache.spark" % "spark-sql_2.10" % "1.6.1" exclude ("org.apache.hadoop","hadoop-yarn-server-web-proxy"),
"org.apache.hadoop" % "hadoop-common" % "2.6.0" exclude ("org.apache.hadoop","hadoop-yarn-server-web-proxy"),
"org.apache.spark" % "spark-sql_2.10" % "1.6.1" exclude ("org.apache.hadoop","hadoop-yarn-server-web-proxy"),
"org.apache.spark" % "spark-hive_2.10" % "1.6.1" exclude ("org.apache.hadoop","hadoop-yarn-sver-web-proxy"),
"org.apache.spark" % "spark-yarn_2.10" % "1.6.1" exclude ("org.apache.hadoop","hadoop-yarn-server-web-proxy"),
"org.apache.spark" % "spark-repl_2.10" % "1.6.1" exclude ("org.apache.hadoop","hadoop-yarn-server-web-proxy") exclude("org.scala-lang","scala-library"),
"org.gbif.validator" % "validator-core" % "0.1-SNAPSHOT" exclude("org.slf4j","slf4j-log4j12") exclude("org.slf4j","log4j-over-slf4j") exclude("com.typesafe.akka","akka-actor_2.11") exclude("org.scala-lang","scala-library"),
"org.gbif" % "gbif-api" % "0.48-SNAPSHOT",
"org.gbif.registry" % "registry-ws-client" % "2.60-SNAPSHOT",
"org.gbif" % "dwc-api" % "1.17-SNAPSHOT",
"com.sun.jersey" % "jersey-servlet" % "1.19"
)
/* SimpleApp.scala */
import org.apache.spark.repl.SparkIMain
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import scala.tools.nsc.GenericRunnerSettings
object DataValidation {
def main(args: Array[String]) {
val t0 = System.currentTimeMillis();
// Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application").setMaster("spark://fmendez.gbif.org:7077")
.setSparkHome("/Users/fmendez/Downloads/spark-1.6.1-bin-hadoop2.6/")
val sc = new SparkContext(conf)
val sqlContext: SQLContext = new SQLContext(sc);
val interpreter = {
val settings = new GenericRunnerSettings(println _ )
settings.usejavacp.value = true
new SparkIMain(settings)
}
val methodRef =
"""
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.gbif.dwc.terms.Term
import org.gbif.occurrence.validation.evaluator.OccurrenceEvaluatorFactory
import org.gbif.occurrence.validation.util.TempTermsUtils
import scala.collection.JavaConverters._
val dataFile = "/Users/fmendez/dev/git/gbif/gbif-data-validator/validator-core/src/test/resources/0008759-160822134323880.csvar"
val data = sqlContext.sparkContext.textFile(dataFile, 20).cache()
val header = data.first()
val rawHeader = header.split("\t")
val terms: Array[Term] = TempTermsUtils.buildTermMapping(header.split("\t"))
//This creates a schema from the header
val schema = StructType(rawHeader.map(fieldName ⇒ StructField(fieldName, StringType, true)))
// this is to ensure that each row has the same number as columns as reported in the header
//RDD[Row] is the data type expected by the session.createDataFrame
val rowData: RDD[Row] = data.zipWithIndex().filter({case(_,idx) => idx != 0})
.map(line => Row.fromSeq(line._1.split("\t").padTo(rawHeader.length,"")))
val ds = sqlContext.createDataFrame(rowData,schema)
//Creates the view
val occDs = ds.registerTempTable("occ")
//runs a sql statement
sqlContext.sql("select count(distinct occurrenceid) from occ").collect()
//This is a bit of duplication: runs all the processing
val results = data.zipWithIndex().filter( {case(line,idx) => idx != 0})
.map({case(line,idx) => (idx,(terms.zip(line.split("\t"))).toMap)})
.mapPartitions( partition => {
val occEvaluator = new OccurrenceEvaluatorFactory("http://api.gbif.org/v1/").create(rawHeader)
val newPartition = partition.map( { case(idx,record) => {
occEvaluator.process(idx, record.asJava)}}).toList
// consumes the iterator, thus calls readMatchingFromDB
newPartition.iterator
}).collect()
"""
interpreter.initializeSynchronous()
interpreter.bind("sqlContext","org.apache.spark.sql.SQLContext", sqlContext)
val resultFlag = interpreter.interpret(methodRef)
val t1 = System.currentTimeMillis();
println("Elapsed time: " + (t1 - t0)/1000 + "s")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment