Skip to content

Instantly share code, notes, and snippets.

@omnisis
Created February 27, 2015 06:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save omnisis/6a7439633b8c4ccea478 to your computer and use it in GitHub Desktop.
Save omnisis/6a7439633b8c4ccea478 to your computer and use it in GitHub Desktop.
package tutorial.storm
import backtype.storm.utils.Utils
import backtype.storm.{LocalCluster, StormSubmitter, Config}
import backtype.storm.generated.StormTopology
import com.typesafe.scalalogging.slf4j.LazyLogging
import org.rogach.scallop._
class RunnerOptions(arguments: Seq[String]) extends ScallopConf(arguments) {
banner("StormRunner v0.1")
footer("\n(c) 2015 NextInstruction.com")
//val props=props[String]('E')
val localOnly = toggle(name = "local", short = 'l', descrNo = "Cluster Mode", descrYes = "Local Only Mode")
val maxTaskParallelism = opt[Int](name = "max-task-parallism", short = 'p', descr = "Max task parallelism", default = Some(4))
val numWorkers = opt[Int](name = "num-workers",descr="Maximum number of workers",short = 'w', default = Some(1))
val debug = toggle(name = "debug", default = Some(false))
val topoName = trailArg[String](name = "topo-name", required = true)
}
abstract class StormRunner extends LazyLogging with App {
private var topoConf: Config = _
private var runnerOpts: RunnerOptions = _
def createTopology: StormTopology
def configure(stormConf: Config): Unit = {
}
def submitTopology(): Unit = {
// run in LOCAL MODE
if (!runnerOpts.localOnly.isSupplied) {
logger.info("Submitting Topology to storm cluster...")
topoConf.setNumWorkers(runnerOpts.maxTaskParallelism())
StormSubmitter.submitTopology(runnerOpts.topoName(), topoConf, createTopology)
}
else {
// run in CLUSTER MODE
val cluster = new LocalCluster()
topoConf.setMaxTaskParallelism(5)
cluster.submitTopology(Config.TOPOLOGY_NAME, topoConf, createTopology)
Utils.sleep(10000)
cluster.killTopology(Config.TOPOLOGY_NAME)
cluster.shutdown()
}
}
runnerOpts = new RunnerOptions(args)
topoConf = new Config()
configure(topoConf)
if (runnerOpts.debug.isSupplied) {
topoConf.setDebug(false)
}
submitTopology()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment