Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
mesos slave collectd plugin (mesos 0.21.x)
#! /usr/bin/python
# Copyright 2014 Ray Rodriguez
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collectd
import json
import urllib2
import socket
import collections
PREFIX = "mesos"
MESOS_HOST = "localhost"
MESOS_PORT = 5051
MESOS_URL = ""
VERBOSE_LOGGING = False
Stat = collections.namedtuple('Stat', ('type', 'path'))
STATS_CUR = {}
# DICT: Mesos 0.21.0
STATS_MESOS = {
# Slave
'slave/cpus_percent': Stat("gauge", "slave/cpus_percent"),
'slave/cpus_total': Stat("gauge", "slave/cpus_total"),
'slave/cpus_used': Stat("gauge", "slave/cpus_used"),
'slave/disk_percent': Stat("gauge", "slave/disk_percent"),
'slave/disk_total': Stat("gauge", "slave/disk_total"),
'slave/disk_used': Stat("gauge", "slave/disk_used"),
'slave/executors_registering': Stat("gauge", "slave/executors_registering"),
'slave/executors_running': Stat("gauge", "slave/executors_running"),
'slave/executors_terminated': Stat("counter", "slave/executors_terminated"),
'slave/executors_terminating': Stat("gauge", "slave/executors_terminating"),
'slave/frameworks_active': Stat("gauge", "slave/frameworks_active"),
'slave/invalid_framework_messages': Stat("gauge", "slave/invalid_framework_messages"),
'slave/invalid_status_updates': Stat("gauge", "slave/invalid_status_updates"),
'slave/mem_percent': Stat("gauge", "slave/mem_percent"),
'slave/mem_total': Stat("gauge", "slave/mem_total"),
'slave/mem_used': Stat("gauge", "slave/mem_used"),
'slave/recovery_errors': Stat("counter", "slave/recovery_errors"),
'slave/registered': Stat("gauge", "slave/registered"),
'slave/tasks_failed': Stat("counter", "slave/tasks_failed"),
'slave/tasks_finished': Stat("counter", "slave/tasks_finished"),
'slave/tasks_killed': Stat("counter", "slave/tasks_killed"),
'slave/tasks_lost': Stat("counter", "slave/tasks_lost"),
'slave/tasks_running': Stat("gauge", "slave/tasks_running"),
'slave/tasks_staging': Stat("gauge", "slave/tasks_staging"),
'slave/tasks_starting': Stat("gauge", "slave/tasks_starting"),
'slave/uptime_secs': Stat("counter", "slave/uptime_secs"),
'slave/valid_framework_messages': Stat("counter", "slave/valid_framework_messages"),
'slave/valid_status_updates': Stat("counter", "slave/valid_status_updates"),
# Slave System Metrics
'system/cpus_total': Stat("gauge", "system/cpus_total"),
'system/load_15min': Stat("gauge", "system/load_15min"),
'system/load_1min': Stat("gauge", "system/load_1min"),
'system/load_5min': Stat("gauge", "system/load_5min"),
}
# FUNCTION: Collect stats from JSON result
def lookup_stat(stat, json):
val = dig_it_up(json, STATS_CUR[stat].path)
# Check to make sure we have a valid result
# dig_it_up returns False if no match found
if not isinstance(val, bool):
return int(val)
else:
return None
def configure_callback(conf):
"""Received configuration information"""
global MESOS_HOST, MESOS_PORT, MESOS_URL, VERBOSE_LOGGING, STATS_CUR
for node in conf.children:
if node.key == 'Host':
MESOS_HOST = node.values[0]
elif node.key == 'Port':
MESOS_PORT = int(node.values[0])
elif node.key == 'Verbose':
VERBOSE_LOGGING = bool(node.values[0])
elif node.key == "Version":
MESOS_VERSION = node.values[0]
else:
collectd.warning('mesos plugin: Unknown config key: %s.' % node.key)
MESOS_URL = "http://" + MESOS_HOST + ":" + str(MESOS_PORT) + "/metrics/snapshot"
STATS_CUR = dict(STATS_MESOS.items())
log_verbose('Configured with host=%s, port=%s, url=%s' % (MESOS_HOST, MESOS_PORT, MESOS_URL))
def fetch_stats():
try:
result = json.load(urllib2.urlopen(MESOS_URL, timeout=10))
except urllib2.URLError, e:
collectd.error('mesos plugin: Error connecting to %s - %r' % (MESOS_URL, e))
return None
return parse_stats(result)
def parse_stats(json):
"""Parse stats response from Mesos"""
for name, key in STATS_CUR.iteritems():
result = lookup_stat(name, json)
dispatch_stat(result, name, key)
def dispatch_stat(result, name, key):
"""Read a key from info response data and dispatch a value"""
if result is None:
collectd.warning('mesos plugin: Value not found for %s' % name)
return
estype = key.type
value = int(result)
log_verbose('Sending value[%s]: %s=%s' % (estype, name, value))
val = collectd.Values(plugin='mesos')
val.type = estype
val.type_instance = name
val.values = [value]
val.dispatch()
def read_callback():
log_verbose('Read callback called')
stats = fetch_stats()
def dig_it_up(obj, path):
try:
if type(path) in (str, unicode):
path = path.split('.')
return reduce(lambda x, y: x[y], path, obj)
except:
return False
def log_verbose(msg):
if not VERBOSE_LOGGING:
return
collectd.info('mesos plugin [verbose]: %s' % msg)
collectd.register_config(configure_callback)
collectd.register_read(read_callback)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.