Created
December 28, 2015 18:08
-
-
Save boxdot/ec444839566bd1e0fdb9 to your computer and use it in GitHub Desktop.
Run a single task on mesos
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import logging | |
import uuid | |
import time | |
import sys | |
import mesos.interface | |
from mesos.interface import mesos_pb2 | |
import mesos.native | |
logging.basicConfig(level=logging.INFO) | |
def new_task(offer): | |
task = mesos_pb2.TaskInfo() | |
id = uuid.uuid4() | |
task.task_id.value = str(id) | |
task.slave_id.value = offer.slave_id.value | |
task.name = "task {}".format(str(id)) | |
cpus = task.resources.add() | |
cpus.name = "cpus" | |
cpus.type = mesos_pb2.Value.SCALAR | |
cpus.scalar.value = 1 | |
mem = task.resources.add() | |
mem.name = "mem" | |
mem.type = mesos_pb2.Value.SCALAR | |
mem.scalar.value = 1 | |
return task | |
class HelloWorldScheduler(mesos.interface.Scheduler): | |
def registered(self, driver, framework_id, master_info): | |
logging.info("Registered with framework id: {}".format(framework_id)) | |
def resourceOffers(self, driver, offers): | |
logging.info("Received resource offers: {}".format( | |
[o.id.value for o in offers])) | |
for offer in offers: | |
task = new_task(offer) | |
task.command.value = "echo Hello, World!" | |
time.sleep(2) | |
logging.info("Launching task {task} using offer {offer}".format( | |
task=task.task_id.value, offer=offer.id.value)) | |
tasks = [task] | |
driver.launchTasks(offer.id, tasks) | |
def statusUpdate(self, driver, update): | |
logging.info("Task {task} is in state {state}".format( | |
task=update.task_id.value, | |
state=mesos_pb2.TaskState.Name(update.state))) | |
if update.state == mesos_pb2.TASK_FINISHED: | |
driver.stop() | |
if (update.state == mesos_pb2.TASK_LOST or | |
update.state == mesos_pb2.TASK_KILLED or | |
update.state == mesos_pb2.TASK_FAILED): | |
logging.info( | |
"Aborting because task {task} is in unexpected state " | |
"{state} with message '{message}'".format( | |
task=update.task_id.value, | |
state=mesos_pb2.TaskState.Name(update.state), | |
message=update.message)) | |
driver.abort() | |
def frameworkMessage(self, driver, executorId, slaveId, message): | |
logging.info("Received message {}".format(message)) | |
def main(): | |
master_address = sys.argv[1] | |
framework = mesos_pb2.FrameworkInfo() | |
framework.user = "" | |
framework.name = "hello-world" | |
driver = mesos.native.MesosSchedulerDriver( | |
HelloWorldScheduler(), | |
framework, | |
master_address | |
) | |
status = 0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1 | |
driver.stop() | |
sys.exit(status) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment