Skip to content

Instantly share code, notes, and snippets.

@sujee
Created September 2, 2014 20:42
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sujee/f21828d6762b0bbb1c61 to your computer and use it in GitHub Desktop.
Save sujee/f21828d6762b0bbb1c61 to your computer and use it in GitHub Desktop.
named RDD
override def runJob(sc: SparkContext, config: Config): Any = {
val fileName = config.getString("input.file")
logger.info("### fileName : " + fileName)
var rdd = this.namedRdds.get[String](fileName)
logger.info("### rdd load 1 : " + rdd)
if (rdd.isDefined) {
logger.info("### rdd %s isDefined".format(fileName))
}
else {
logger.info("### rdd %s doesn't exist... loading".format(fileName))
val newRDD = sc.textFile(fileName)
newRDD.cache()
newRDD.persist()
this.namedRdds.update(fileName, newRDD)
logger.info("### rdd %s updated".format(fileName))
}
rdd = this.namedRdds.get[String](fileName)
logger.info("### rdd load 2 : " + rdd)
val count = rdd.count()
logger.info ("### count : " + count)
}
@dhruvsinghal991
Copy link

Hi Sujee,
I am running the same code, but it's throwing me the error of
Exception in thread "pool-1-thread-1" java.lang.NoSuchMethodError: spark.jobserver.NamedRdds.update(Ljava/lang/String;Lscala/Function0;)Lorg/apache/spark/rdd/RDD

Below is my code:

/**

  • Created by dhruvsinghal on 18/11/15.
    */
    package com.dhruv
    import org.apache.spark._
    import com.typesafe.config.{Config,ConfigFactory}
    import org.apache.spark.SparkContext._
    import spark.jobserver.{NamedRdds, NamedRddSupport, SparkJob, SparkJobInvalid, SparkJobValid, SparkJobValidation}
    import scala.util.Try

object saveRDDJob extends SparkJob with NamedRddSupport{
def main(args: Array[String]) {
val sc = new SparkContext("local[4]", "WordCountExample")
val config = ConfigFactory.parseString("")
val results = runJob(sc, config)
println("Result is " + results)
}
override def runJob(sc:SparkContext, config: Config): Any ={
val dd = sc.parallelize(1 to 100)
dd.cache()
dd.persist()
this.namedRdds.update("savedrdd", dd)
dd.collect()
}
override def validate(sc:SparkContext, config:Config): SparkJobValidation={
Try(config.getString("input.string"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No input.string config param"))
}
}

object retrieveRDDJob extends SparkJob with NamedRddSupport{
def main(args:Array[String]): Unit ={
val sc = new SparkContext("local[4]", "WordCountExample")
val config = ConfigFactory.parseString("")
val results = runJob(sc, config)
println("Result is "+ results)
}
override def runJob(sc:SparkContext, config:Config): Any={
val dd = this.namedRdds.get("sampleRDD").get

}
override def validate(sc:SparkContext, config:Config): SparkJobValidation={
Try(config.getString("input.string"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No input.string config param"))
}
}

SBT:

name := "MergingDatasets"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1"

resolvers += "Job Server Bintray" at "https://dl.bintray.com/spark-jobserver/maven"

libraryDependencies += "spark.jobserver" %% "job-server-api" % "0.6.0" % "provided"
libraryDependencies += "spark.jobserver" %% "job-server-extras" % "0.6.0" % "provided"

val buildSettings = Defaults.defaultSettings ++ Seq(
javaOptions += "-Xmx8G"
)

assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment