Skip to content

Instantly share code, notes, and snippets.

@jasonlai
Created June 16, 2017 10:22
Show Gist options
  • Save jasonlai/80e072a3cb0b48f3d1876c4c56b2e6f8 to your computer and use it in GitHub Desktop.
Save jasonlai/80e072a3cb0b48f3d1876c4c56b2e6f8 to your computer and use it in GitHub Desktop.
Adhoc Mesos Executor
python_library(
name='executor',
sources=rglobs('*.py'),
dependencies=[
'3rdparty/python:mesos.executor',
'3rdparty/python:mesos.interface',
'3rdparty/python:pex',
'3rdparty/python:protobuf',
],
provides=setup_py(
name='executor',
version='0.0.1',
),
)
python_binary(
name='main',
entry_point='executor:main',
always_write_cache=True,
dependencies=[':executor'],
)
from __future__ import print_function
import sys
from threading import Thread
from time import sleep
from mesos.interface import Executor
from mesos.interface.mesos_pb2 import (
DRIVER_STOPPED,
TASK_RUNNING,
TASK_FINISHED,
TaskID,
TaskStatus,
)
from mesos.executor import MesosExecutorDriver
class MyExecutor(Executor):
def _run_task(self, driver, task):
task_id = task.task_id
container = task.container
self.log('Running task %s' % task_id.value)
self.log('task.container: %s' % container)
self.log('task.container.mesos: %s' % bool(container.mesos))
self.log('task.container.mesos.image: %s' % bool(container.mesos.image))
while True:
self.log('Sending status update...')
status = TaskStatus(
task_id=TaskID(value=task.task_id.value),
state=TASK_RUNNING,
data='data with a \0 byte',
)
driver.sendStatusUpdate(status)
sleep(30)
status = TaskStatus(
task_id=TaskID(value=task.task_id.value),
state=TASK_FINISHED,
data='data with a \0 byte',
)
driver.sendStatusUpdate(status)
self.log('Sent status update')
def log(self, msg):
print(msg)
def registered(self, driver, executor_info, framework_info, slave_info):
self.log('registered() called with:')
self.log(' ExecutorInfo: %s' % executor_info)
self.log(' FrameworkInfo: %s' % framework_info)
self.log(' SlaveInfo: %s' % slave_info)
self._driver = driver
self._executor_info = executor_info
self._framework_info = framework_info
self._slave_info = slave_info
def reregistered(self, driver, slave_info):
self.log('reregistered() called with:')
self.log(' SlaveInfo: %s' % slave_info)
def disconnected(self, driver):
self.log('disconnected() called')
def launchTask(self, driver, task):
thread = Thread(target=self._run_task, args=(driver, task))
thread.start()
def frameworkMessage(self, driver, message):
driver.sendFrameworkMessage(message)
def main():
print('Starting executor....')
executor = MyExecutor()
driver = MesosExecutorDriver(executor)
if driver.run() != DRIVER_STOPPED:
return 1
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment