Created
March 4, 2016 02:14
-
-
Save atty303/c83f3c8cb8a930951be0 to your computer and use it in GitHub Desktop.
SparkBoot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.adtdp.tenma.shared.spark.app | |
import scala.util.Try | |
/** | |
* Spark アプリケーションを表現する trait | |
*/ | |
trait SparkApp { | |
/** | |
* アプリケーションのエントリポイント | |
* | |
* SparkBoot から SparkContext などの準備を整えたのちに呼び出される。 | |
* | |
* @return Success(終了コード) or Failure | |
*/ | |
def run(): Try[Int] | |
/** | |
* SparkAppExecption を指定されたメッセージで投げる | |
* | |
* @param message | |
*/ | |
def error(message: String): Nothing = throw SparkAppException(message) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.adtdp.tenma.shared.spark.boot | |
import com.adtdp.tenma.shared.spark.app.SparkApp | |
import org.apache.spark.SparkContext | |
trait SparkBatchBoot extends SparkBoot { | |
def mkApp(sc: SparkContext, args: Array[String]): SparkApp | |
def main(args: Array[String]): Unit = { | |
val sc = mkSparkContext() | |
val app = mkApp(sc, args) | |
val status = app.run().recover { | |
case cause => | |
_logger.error(s"Failed to prepare ${app.getClass.getSimpleName}", cause) | |
255 | |
}.get | |
sc.stop() | |
sys.exit(status) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.adtdp.tenma.shared.spark.boot | |
import java.io.File | |
import com.typesafe.config.{Config, ConfigFactory} | |
import org.apache.spark.{SparkConf, SparkContext, SparkFiles} | |
import org.slf4j.LoggerFactory | |
private[boot] trait SparkBoot { | |
protected val _logger = LoggerFactory.getLogger(getClass.getName) | |
def main(args: Array[String]): Unit | |
def appName: String | |
@transient | |
protected lazy val defaultConfig: Config = ConfigFactory.load() | |
@transient | |
protected lazy val appConfig: Config = { | |
val confName = System.getProperty("app.config.name", "application.conf") | |
_logger.info(s"Loading an application configuration from [$confName]") | |
val confFile = new File(SparkFiles.get(confName)) | |
if (!confFile.exists()) | |
_logger.warn(s"configuration file is missing, maybe are you forgot to add --files to spark-submit? [$confName]") | |
val appConf = ConfigFactory.parseFileAnySyntax(confFile) | |
ConfigFactory.load(appConf) | |
} | |
protected def mkSparkConf(): SparkConf = { | |
import scala.collection.JavaConverters._ | |
val conf = new SparkConf().setAppName(appName) | |
defaultConfig.getConfig("spark-conf").entrySet().asScala.foreach { e => | |
conf.setIfMissing(s"${e.getKey}", e.getValue.unwrapped().toString) | |
} | |
conf | |
} | |
protected def mkSparkContext(): SparkContext = { | |
new SparkContext(mkSparkConf()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.adtdp.tenma.shared.spark.boot | |
import java.util.concurrent.TimeUnit | |
import com.adtdp.tenma.shared.spark.app.SparkApp | |
import org.apache.spark.streaming.{Duration, Durations, StreamingContext} | |
import org.apache.spark.{SparkConf, SparkContext} | |
import scala.util.{Failure, Success} | |
/** | |
* @author Koji Agawa | |
*/ | |
trait SparkStreamingBoot extends SparkBoot { | |
def mkApp(sc: SparkContext, ssc: StreamingContext, args: Array[String]): SparkApp | |
val checkpointPath: String = "app.streaming.checkpoint-path" | |
def checkpoint: Option[String] = if (appConfig.hasPath(checkpointPath)) Some(appConfig.getString(checkpointPath)) else None | |
val batchDurationPath: String = "app.streaming.batch-duration" | |
def batchDuration: Duration = Durations.milliseconds(appConfig.getDuration(batchDurationPath, TimeUnit.MILLISECONDS)) | |
protected def mkStreamingContext(sc: SparkContext): StreamingContext = { | |
def create(): StreamingContext = { | |
val ssc = new StreamingContext(sc, batchDuration) | |
checkpoint.foreach { dir => | |
ssc.checkpoint(dir) | |
} | |
ssc | |
} | |
checkpoint.fold(create())(StreamingContext.getOrCreate(_, create)) | |
} | |
protected override def mkSparkConf(): SparkConf = { | |
super.mkSparkConf() | |
// Streaming ではバッチと違い実行時間の短い大量の Job が溜まるので、定期的に掃除する必要がある。 | |
.set("spark.cleaner.ttl", (60 * 60 * 3).toString) | |
} | |
override def main(args: Array[String]): Unit = { | |
val sc = mkSparkContext() | |
val ssc = mkStreamingContext(sc) | |
val app = mkApp(sc, ssc, args) | |
app.run() match { | |
case Success(_) => | |
// Start the streaming computation | |
ssc.start() | |
ssc.awaitTermination() | |
case Failure(cause) => | |
_logger.error(s"Failed to prepare ${app.getClass.getSimpleName}", cause) | |
} | |
// `spark-submit --supervise` で起動したときに無限リトライとならないよう常に sys.exit(0) する | |
sys.exit(0) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment