Skip to content

Instantly share code, notes, and snippets.

@atty303
Created March 4, 2016 02:14
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save atty303/c83f3c8cb8a930951be0 to your computer and use it in GitHub Desktop.
Save atty303/c83f3c8cb8a930951be0 to your computer and use it in GitHub Desktop.
SparkBoot
package com.adtdp.tenma.shared.spark.app
import scala.util.Try
/**
* Spark アプリケーションを表現する trait
*/
trait SparkApp {
/**
* アプリケーションのエントリポイント
*
* SparkBoot から SparkContext などの準備を整えたのちに呼び出される。
*
* @return Success(終了コード) or Failure
*/
def run(): Try[Int]
/**
* SparkAppExecption を指定されたメッセージで投げる
*
* @param message
*/
def error(message: String): Nothing = throw SparkAppException(message)
}
package com.adtdp.tenma.shared.spark.boot
import com.adtdp.tenma.shared.spark.app.SparkApp
import org.apache.spark.SparkContext
trait SparkBatchBoot extends SparkBoot {
def mkApp(sc: SparkContext, args: Array[String]): SparkApp
def main(args: Array[String]): Unit = {
val sc = mkSparkContext()
val app = mkApp(sc, args)
val status = app.run().recover {
case cause =>
_logger.error(s"Failed to prepare ${app.getClass.getSimpleName}", cause)
255
}.get
sc.stop()
sys.exit(status)
}
}
package com.adtdp.tenma.shared.spark.boot
import java.io.File
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.{SparkConf, SparkContext, SparkFiles}
import org.slf4j.LoggerFactory
private[boot] trait SparkBoot {
protected val _logger = LoggerFactory.getLogger(getClass.getName)
def main(args: Array[String]): Unit
def appName: String
@transient
protected lazy val defaultConfig: Config = ConfigFactory.load()
@transient
protected lazy val appConfig: Config = {
val confName = System.getProperty("app.config.name", "application.conf")
_logger.info(s"Loading an application configuration from [$confName]")
val confFile = new File(SparkFiles.get(confName))
if (!confFile.exists())
_logger.warn(s"configuration file is missing, maybe are you forgot to add --files to spark-submit? [$confName]")
val appConf = ConfigFactory.parseFileAnySyntax(confFile)
ConfigFactory.load(appConf)
}
protected def mkSparkConf(): SparkConf = {
import scala.collection.JavaConverters._
val conf = new SparkConf().setAppName(appName)
defaultConfig.getConfig("spark-conf").entrySet().asScala.foreach { e =>
conf.setIfMissing(s"${e.getKey}", e.getValue.unwrapped().toString)
}
conf
}
protected def mkSparkContext(): SparkContext = {
new SparkContext(mkSparkConf())
}
}
package com.adtdp.tenma.shared.spark.boot
import java.util.concurrent.TimeUnit
import com.adtdp.tenma.shared.spark.app.SparkApp
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.{Failure, Success}
/**
* @author Koji Agawa
*/
trait SparkStreamingBoot extends SparkBoot {
def mkApp(sc: SparkContext, ssc: StreamingContext, args: Array[String]): SparkApp
val checkpointPath: String = "app.streaming.checkpoint-path"
def checkpoint: Option[String] = if (appConfig.hasPath(checkpointPath)) Some(appConfig.getString(checkpointPath)) else None
val batchDurationPath: String = "app.streaming.batch-duration"
def batchDuration: Duration = Durations.milliseconds(appConfig.getDuration(batchDurationPath, TimeUnit.MILLISECONDS))
protected def mkStreamingContext(sc: SparkContext): StreamingContext = {
def create(): StreamingContext = {
val ssc = new StreamingContext(sc, batchDuration)
checkpoint.foreach { dir =>
ssc.checkpoint(dir)
}
ssc
}
checkpoint.fold(create())(StreamingContext.getOrCreate(_, create))
}
protected override def mkSparkConf(): SparkConf = {
super.mkSparkConf()
// Streaming ではバッチと違い実行時間の短い大量の Job が溜まるので、定期的に掃除する必要がある。
.set("spark.cleaner.ttl", (60 * 60 * 3).toString)
}
override def main(args: Array[String]): Unit = {
val sc = mkSparkContext()
val ssc = mkStreamingContext(sc)
val app = mkApp(sc, ssc, args)
app.run() match {
case Success(_) =>
// Start the streaming computation
ssc.start()
ssc.awaitTermination()
case Failure(cause) =>
_logger.error(s"Failed to prepare ${app.getClass.getSimpleName}", cause)
}
// `spark-submit --supervise` で起動したときに無限リトライとならないよう常に sys.exit(0) する
sys.exit(0)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment