Skip to content

Instantly share code, notes, and snippets.

@boxdot
Created December 28, 2015 18:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save boxdot/ec444839566bd1e0fdb9 to your computer and use it in GitHub Desktop.
Save boxdot/ec444839566bd1e0fdb9 to your computer and use it in GitHub Desktop.
Run a single task on mesos
#!/usr/bin/env python
import logging
import uuid
import time
import sys
import mesos.interface
from mesos.interface import mesos_pb2
import mesos.native
logging.basicConfig(level=logging.INFO)
def new_task(offer):
task = mesos_pb2.TaskInfo()
id = uuid.uuid4()
task.task_id.value = str(id)
task.slave_id.value = offer.slave_id.value
task.name = "task {}".format(str(id))
cpus = task.resources.add()
cpus.name = "cpus"
cpus.type = mesos_pb2.Value.SCALAR
cpus.scalar.value = 1
mem = task.resources.add()
mem.name = "mem"
mem.type = mesos_pb2.Value.SCALAR
mem.scalar.value = 1
return task
class HelloWorldScheduler(mesos.interface.Scheduler):
def registered(self, driver, framework_id, master_info):
logging.info("Registered with framework id: {}".format(framework_id))
def resourceOffers(self, driver, offers):
logging.info("Received resource offers: {}".format(
[o.id.value for o in offers]))
for offer in offers:
task = new_task(offer)
task.command.value = "echo Hello, World!"
time.sleep(2)
logging.info("Launching task {task} using offer {offer}".format(
task=task.task_id.value, offer=offer.id.value))
tasks = [task]
driver.launchTasks(offer.id, tasks)
def statusUpdate(self, driver, update):
logging.info("Task {task} is in state {state}".format(
task=update.task_id.value,
state=mesos_pb2.TaskState.Name(update.state)))
if update.state == mesos_pb2.TASK_FINISHED:
driver.stop()
if (update.state == mesos_pb2.TASK_LOST or
update.state == mesos_pb2.TASK_KILLED or
update.state == mesos_pb2.TASK_FAILED):
logging.info(
"Aborting because task {task} is in unexpected state "
"{state} with message '{message}'".format(
task=update.task_id.value,
state=mesos_pb2.TaskState.Name(update.state),
message=update.message))
driver.abort()
def frameworkMessage(self, driver, executorId, slaveId, message):
logging.info("Received message {}".format(message))
def main():
master_address = sys.argv[1]
framework = mesos_pb2.FrameworkInfo()
framework.user = ""
framework.name = "hello-world"
driver = mesos.native.MesosSchedulerDriver(
HelloWorldScheduler(),
framework,
master_address
)
status = 0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1
driver.stop()
sys.exit(status)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment