Skip to content

Instantly share code, notes, and snippets.

@florianleibert
Created July 15, 2013 22:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save florianleibert/6004234 to your computer and use it in GitHub Desktop.
Save florianleibert/6004234 to your computer and use it in GitHub Desktop.
class ShellExecutor() extends Executor {
//args|command.
// e.g. args: -av (async job), verbose mode
val executorService = Executors.newFixedThreadPool(20)
val log = Logger.getLogger(getClass.getName)
def registered(executorDriver: ExecutorDriver, executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo,
slaveInfo: SlaveInfo) {
println("Registered....")
}
def reregistered(p1: ExecutorDriver, p2: SlaveInfo) {
println("Reregistered...")
//Ignore
}
def disconnected(p1: ExecutorDriver) {
//Ignore
println("DISCONNECTED!")
}
def launchTask(driver: ExecutorDriver, taskInfo: TaskInfo) {
try {
val status1: TaskStatus = TaskStatus.newBuilder.setTaskId(taskInfo.getTaskId)
.setState(TaskState.TASK_RUNNING).build()
driver.sendStatusUpdate(status1)
log.info("Running task: " + taskInfo.getTaskId)
val executorArgsPattern(flagString, command) = taskInfo.getData.toStringUtf8
val flags = {
if (flagString.size > 0) {
flagString.split("\\s+")
} else {
new Array[String](0)
}
}
log.info("Flags received: " + flagString)
log.info("Command received: " +command)
val parser = new ArgotParser("internalParser")
val retriesFlag = parser.option[Int](List("r"), "n", "total retries, 0 by default")
parser.parse(flags)
val retries = retriesFlag.value.getOrElse(0)
//TODO(FL): Implement retries.
log.info("Retries flag:" + retries)
log.info("Task id: " + taskInfo.getTaskId.getValue)
log.info("Command:" + command)
val callback : (Int, TaskID) => Unit = { (x,y) =>
val status2 = TaskStatus.newBuilder()
if (x == 0) {
status2.setTaskId(y).setState(TaskState.TASK_FINISHED)
} else {
status2.setTaskId(y).setState(TaskState.TASK_FAILED)
}
driver.sendStatusUpdate(status2.build())
}
val task = new RunCommand(List(command), taskInfo.getTaskId, callback)
executorService.submit(task)
} catch {
case t: Throwable => log.warning("Caught exception:" + t.getMessage)
driver.sendStatusUpdate(TaskStatus.newBuilder()
.setTaskId(taskInfo.getTaskId()).setState(TaskState.TASK_FAILED).build())
}
}
def killTask(p1: ExecutorDriver, p2: TaskID) {
//Not supported
println("KILL TASK")
}
def frameworkMessage(p1: ExecutorDriver, p2: Array[Byte]) {
//Not supported
println("FRAMEWORK MESSAGE!")
}
def shutdown(p1: ExecutorDriver) {
println("Shutdown!")
//No-op
System.exit(0)
}
def error(p1: ExecutorDriver, p2: String) {
println("Error!" + p2)
}
}
object ShellExecutor {
//TODO(FL): Fix this.
System.getProperties.setProperty("java.util.logging.SimpleFormatter.format", "[%1$tc] %4$s: %5$s%n")
val log = Logger.getLogger(getClass.getName)
def main(args: Array[String]) {
log.info("Starting shell-executor version: 0.2")
val conf = new Configuration(args)
val executor = new ShellExecutor()
val executorDriver = new MesosExecutorDriver(executor)
val status = executorDriver.run()
status match {
case Status.DRIVER_STOPPED => System.exit(0)
case _ => System.exit(1)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment