Last active
January 3, 2016 12:39
-
-
Save conorbranagan/8463857 to your computer and use it in GitHub Desktop.
Elasticsearch - supporting basic authentication
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import socket | |
import subprocess | |
import sys | |
import time | |
import urlparse | |
import urllib2 | |
from util import json, headers | |
from checks import AgentCheck | |
HEALTH_URL = "/_cluster/health?pretty=true" | |
STATS_URL = "/_cluster/nodes/stats?all=true" | |
NODES_URL = "/_cluster/nodes?network=true" | |
METRICS = { | |
"elasticsearch.docs.count": ("gauge", "indices.docs.count"), | |
"elasticsearch.docs.deleted": ("gauge", "indices.docs.deleted"), | |
"elasticsearch.store.size": ("gauge", "indices.store.size_in_bytes"), | |
"elasticsearch.indexing.index.total": ("gauge", "indices.indexing.index_total"), | |
"elasticsearch.indexing.index.time": ("gauge", "indices.indexing.index_time_in_millis", lambda v: float(v)/1000), | |
"elasticsearch.indexing.index.current": ("gauge", "indices.indexing.index_current"), | |
"elasticsearch.indexing.delete.total": ("gauge", "indices.indexing.delete_total"), | |
"elasticsearch.indexing.delete.time": ("gauge", "indices.indexing.delete_time_in_millis", lambda v: float(v)/1000), | |
"elasticsearch.indexing.delete.current": ("gauge", "indices.indexing.delete_current"), | |
"elasticsearch.get.total": ("gauge", "indices.get.total"), | |
"elasticsearch.get.time": ("gauge", "indices.get.time_in_millis", lambda v: float(v)/1000), | |
"elasticsearch.get.current": ("gauge", "indices.get.current"), | |
"elasticsearch.get.exists.total": ("gauge", "indices.get.exists_total"), | |
"elasticsearch.get.exists.time": ("gauge", "indices.get.exists_time_in_millis", lambda v: float(v)/1000), | |
"elasticsearch.get.missing.total": ("gauge", "indices.get.missing_total"), | |
"elasticsearch.get.missing.time": ("gauge", "indices.get.missing_time_in_millis", lambda v: float(v)/1000), | |
"elasticsearch.search.query.total": ("gauge", "indices.search.query_total"), | |
"elasticsearch.search.query.time": ("gauge", "indices.search.query_time_in_millis", lambda v: float(v)/1000), | |
"elasticsearch.search.query.current": ("gauge", "indices.search.query_current"), | |
"elasticsearch.search.fetch.total": ("gauge", "indices.search.fetch_total"), | |
"elasticsearch.search.fetch.time": ("gauge", "indices.search.fetch_time_in_millis", lambda v: float(v)/1000), | |
"elasticsearch.search.fetch.current": ("gauge", "indices.search.fetch_current"), | |
"elasticsearch.cache.field.evictions": ("gauge", "indices.cache.field_evictions"), | |
"elasticsearch.cache.field.size": ("gauge", "indices.cache.field_size_in_bytes"), | |
"elasticsearch.cache.filter.count": ("gauge", "indices.cache.filter_count"), | |
"elasticsearch.cache.filter.evictions": ("gauge", "indices.cache.filter_evictions"), | |
"elasticsearch.cache.filter.size": ("gauge", "indices.cache.filter_size_in_bytes"), | |
"elasticsearch.merges.current": ("gauge", "indices.merges.current"), | |
"elasticsearch.merges.current.docs": ("gauge", "indices.merges.current_docs"), | |
"elasticsearch.merges.current.size": ("gauge", "indices.merges.current_size_in_bytes"), | |
"elasticsearch.merges.total": ("gauge", "indices.merges.total"), | |
"elasticsearch.merges.total.time": ("gauge", "indices.merges.total_time_in_millis", lambda v: float(v)/1000), | |
"elasticsearch.merges.total.docs": ("gauge", "indices.merges.total_docs"), | |
"elasticsearch.merges.total.size": ("gauge", "indices.merges.total_size_in_bytes"), | |
"elasticsearch.refresh.total": ("gauge", "indices.refresh.total"), | |
"elasticsearch.refresh.total.time": ("gauge", "indices.refresh.total_time_in_millis", lambda v: float(v)/1000), | |
"elasticsearch.flush.total": ("gauge", "indices.flush.total"), | |
"elasticsearch.flush.total.time": ("gauge", "indices.flush.total_time_in_millis", lambda v: float(v)/1000), | |
"elasticsearch.process.open_fd": ("gauge", "process.open_file_descriptors"), | |
"elasticsearch.transport.rx_count": ("gauge", "transport.rx_count"), | |
"elasticsearch.transport.tx_count": ("gauge", "transport.tx_count"), | |
"elasticsearch.transport.rx_size": ("gauge", "transport.rx_size_in_bytes"), | |
"elasticsearch.transport.tx_size": ("gauge", "transport.tx_size_in_bytes"), | |
"elasticsearch.transport.server_open": ("gauge", "transport.server_open"), | |
"elasticsearch.thread_pool.bulk.active": ("gauge", "thread_pool.bulk.active"), | |
"elasticsearch.thread_pool.bulk.threads": ("gauge", "thread_pool.bulk.threads"), | |
"elasticsearch.thread_pool.bulk.queue": ("gauge", "thread_pool.bulk.queue"), | |
"elasticsearch.thread_pool.cache.active": ("gauge", "thread_pool.cache.active"), | |
"elasticsearch.thread_pool.cache.threads": ("gauge", "thread_pool.cache.threads"), | |
"elasticsearch.thread_pool.cache.queue": ("gauge", "thread_pool.cache.queue"), | |
"elasticsearch.thread_pool.flush.active": ("gauge", "thread_pool.flush.active"), | |
"elasticsearch.thread_pool.flush.threads": ("gauge", "thread_pool.flush.threads"), | |
"elasticsearch.thread_pool.flush.queue": ("gauge", "thread_pool.flush.queue"), | |
"elasticsearch.thread_pool.generic.active": ("gauge", "thread_pool.generic.active"), | |
"elasticsearch.thread_pool.generic.threads": ("gauge", "thread_pool.generic.threads"), | |
"elasticsearch.thread_pool.generic.queue": ("gauge", "thread_pool.generic.queue"), | |
"elasticsearch.thread_pool.get.active": ("gauge", "thread_pool.get.active"), | |
"elasticsearch.thread_pool.get.threads": ("gauge", "thread_pool.get.threads"), | |
"elasticsearch.thread_pool.get.queue": ("gauge", "thread_pool.get.queue"), | |
"elasticsearch.thread_pool.index.active": ("gauge", "thread_pool.index.active"), | |
"elasticsearch.thread_pool.index.threads": ("gauge", "thread_pool.index.threads"), | |
"elasticsearch.thread_pool.index.queue": ("gauge", "thread_pool.index.queue"), | |
"elasticsearch.thread_pool.management.active": ("gauge", "thread_pool.management.active"), | |
"elasticsearch.thread_pool.management.threads": ("gauge", "thread_pool.management.threads"), | |
"elasticsearch.thread_pool.management.queue": ("gauge", "thread_pool.management.queue"), | |
"elasticsearch.thread_pool.merge.active": ("gauge", "thread_pool.merge.active"), | |
"elasticsearch.thread_pool.merge.threads": ("gauge", "thread_pool.merge.threads"), | |
"elasticsearch.thread_pool.merge.queue": ("gauge", "thread_pool.merge.queue"), | |
"elasticsearch.thread_pool.percolate.active": ("gauge", "thread_pool.percolate.active"), | |
"elasticsearch.thread_pool.percolate.threads": ("gauge", "thread_pool.percolate.threads"), | |
"elasticsearch.thread_pool.percolate.queue": ("gauge", "thread_pool.percolate.queue"), | |
"elasticsearch.thread_pool.refresh.active": ("gauge", "thread_pool.refresh.active"), | |
"elasticsearch.thread_pool.refresh.threads": ("gauge", "thread_pool.refresh.threads"), | |
"elasticsearch.thread_pool.refresh.queue": ("gauge", "thread_pool.refresh.queue"), | |
"elasticsearch.thread_pool.search.active": ("gauge", "thread_pool.search.active"), | |
"elasticsearch.thread_pool.search.threads": ("gauge", "thread_pool.search.threads"), | |
"elasticsearch.thread_pool.search.queue": ("gauge", "thread_pool.search.queue"), | |
"elasticsearch.thread_pool.snapshot.active": ("gauge", "thread_pool.snapshot.active"), | |
"elasticsearch.thread_pool.snapshot.threads": ("gauge", "thread_pool.snapshot.threads"), | |
"elasticsearch.thread_pool.snapshot.queue": ("gauge", "thread_pool.snapshot.queue"), | |
"elasticsearch.http.current_open": ("gauge", "http.current_open"), | |
"elasticsearch.http.total_opened": ("gauge", "http.total_opened"), | |
"jvm.gc.collection_count": ("gauge", "jvm.gc.collection_count"), | |
"jvm.gc.collection_time": ("gauge", "jvm.gc.collection_time_in_millis", lambda v: float(v)/1000), | |
"jvm.gc.concurrent_mark_sweep.count": ("gauge", "jvm.gc.collectors.ConcurrentMarkSweep.collection_count"), | |
"jvm.gc.concurrent_mark_sweep.collection_time": ("gauge", "jvm.gc.collectors.ConcurrentMarkSweep.collection_time_in_millis", lambda v: float(v)/1000), | |
"jvm.gc.par_new.count": ("gauge", "jvm.gc.collectors.ParNew.collection_count"), | |
"jvm.gc.par_new.collection_time": ("gauge", "jvm.gc.collectors.ParNew.collection_time_in_millis", lambda v: float(v)/1000), | |
"jvm.gc.copy.count": ("gauge", "jvm.gc.collectors.Copy.collection_count"), | |
"jvm.gc.copy.collection_time": ("gauge", "jvm.gc.collectors.Copy.collection_time_in_millis", lambda v: float(v)/1000), | |
"jvm.mem.heap_committed": ("gauge", "jvm.mem.heap_committed_in_bytes"), | |
"jvm.mem.heap_used": ("gauge", "jvm.mem.heap_used_in_bytes"), | |
"jvm.mem.non_heap_committed": ("gauge", "jvm.mem.non_heap_committed_in_bytes"), | |
"jvm.mem.non_heap_used": ("gauge", "jvm.mem.non_heap_used_in_bytes"), | |
"jvm.threads.count": ("gauge", "jvm.threads.count"), | |
"jvm.threads.peak_count": ("gauge", "jvm.threads.peak_count"), | |
"elasticsearch.number_of_nodes": ("gauge", "number_of_nodes"), | |
"elasticsearch.number_of_data_nodes": ("gauge", "number_of_data_nodes"), | |
"elasticsearch.active_primary_shards": ("gauge", "active_primary_shards"), | |
"elasticsearch.active_shards": ("gauge", "active_shards"), | |
"elasticsearch.relocating_shards": ("gauge", "relocating_shards"), | |
"elasticsearch.initializing_shards": ("gauge", "initializing_shards"), | |
"elasticsearch.unassigned_shards": ("gauge", "unassigned_shards"), | |
} | |
class NodeNotFound(Exception): pass | |
class ElasticSearch(AgentCheck): | |
def __init__(self, name, init_config, agentConfig): | |
AgentCheck.__init__(self, name, init_config, agentConfig) | |
# Host status needs to persist across all checks | |
self.cluster_status = {} | |
def check(self, instance): | |
config_url = instance.get('url') | |
if config_url is None: | |
raise Exception("An url must be specified") | |
# Load basic authentication configuration, if available. | |
username, password = instance.get('username'), instance.get('password') | |
if username and password: | |
auth = (username, password) | |
else: | |
auth = None | |
# Support URLs that have a path in them from the config, for | |
# backwards-compatibility. | |
parsed = urlparse.urlparse(config_url) | |
if parsed.path != "": | |
config_url = "%s://%s" % (parsed.scheme, parsed.netloc) | |
# Tag by URL so we can differentiate the metrics from multiple instances | |
tags = ['url:%s' % config_url] | |
# Load stats data. | |
url = urlparse.urljoin(config_url, STATS_URL) | |
stats_data = self._get_data(url, auth) | |
self._process_stats_data(config_url, stats_data, auth, tags=tags) | |
# Load the health data. | |
url = urlparse.urljoin(config_url, HEALTH_URL) | |
health_data = self._get_data(url, auth) | |
self._process_health_data(config_url, health_data, tags=tags) | |
def _get_data(self, url, auth=None): | |
""" Hit a given URL and return the parsed json | |
`auth` is a tuple of (username, password) or None | |
""" | |
req = urllib2.Request(url, None, headers(self.agentConfig)) | |
if auth: | |
encoded_auth_str = base64.encodestring('%s:%s' % (auth[0], auth[1])).strip() | |
req.add_header('Authorization', 'Basic %s' % encoded_auth_str) | |
request = urllib2.urlopen(req) | |
response = request.read() | |
return json.loads(response) | |
def _process_stats_data(self, config_url, data, auth, tags=None): | |
for node in data['nodes']: | |
node_data = data['nodes'][node] | |
def process_metric(metric, xtype, path, xform=None): | |
# closure over node_data | |
self._process_metric(node_data, metric, path, xform, tags=tags) | |
if 'hostname' in node_data: | |
# For ES >= 0.19 | |
hostnames = ( | |
self.hostname.decode('utf-8'), | |
socket.gethostname().decode('utf-8'), | |
socket.getfqdn().decode('utf-8') | |
) | |
if node_data['hostname'].decode('utf-8') in hostnames: | |
self._map_metric(process_metric) | |
else: | |
# ES < 0.19 | |
# Fetch interface address from ifconfig or ip addr and check | |
# against the primary IP from ES | |
try: | |
nodes_url = urlparse.urljoin(config_url, NODES_URL) | |
primary_addr = self._get_primary_addr(nodes_url, node, auth) | |
except NodeNotFound: | |
# Skip any nodes that aren't found | |
continue | |
if self._host_matches_node(primary_addr): | |
self._map_metric(process_metric) | |
def _get_primary_addr(self, url, node_name, auth): | |
""" Returns a list of primary interface addresses as seen by ES. | |
Used in ES < 0.19 | |
""" | |
req = urllib2.Request(url, None, headers(self.agentConfig)) | |
# Load basic authentication configuration, if available. | |
if auth: | |
encoded_auth_str = base64.encodestring('%s:%s' % (auth[0], auth[1])).strip() | |
req.add_header('Authorization', 'Basic %s' % encoded_auth_str) | |
request = urllib2.urlopen(req) | |
response = request.read() | |
data = json.loads(response) | |
if node_name in data['nodes']: | |
node = data['nodes'][node_name] | |
if 'network' in node\ | |
and 'primary_interface' in node['network']\ | |
and 'address' in node['network']['primary_interface']: | |
return node['network']['primary_interface']['address'] | |
raise NodeNotFound() | |
def _host_matches_node(self, primary_addrs): | |
""" For < 0.19, check if the current host matches the IP given in the | |
cluster nodes check `/_cluster/nodes`. Uses `ip addr` on Linux and | |
`ifconfig` on Mac | |
""" | |
if sys.platform == 'darwin': | |
ifaces = subprocess.Popen(['ifconfig'], stdout=subprocess.PIPE) | |
else: | |
ifaces = subprocess.Popen(['ip', 'addr'], stdout=subprocess.PIPE) | |
grepper = subprocess.Popen(['grep', 'inet'], stdin=ifaces.stdout, | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
ifaces.stdout.close() | |
out, err = grepper.communicate() | |
# Capture the list of interface IPs | |
ips = [] | |
for iface in out.split("\n"): | |
iface = iface.strip() | |
if iface: | |
ips.append( iface.split(' ')[1].split('/')[0] ) | |
# Check the interface addresses against the primary address | |
return primary_addrs in ips | |
def _process_metric(self, data, metric, path, xform=None, tags=None): | |
"""data: dictionary containing all the stats | |
metric: datadog metric | |
path: corresponding path in data, flattened, e.g. thread_pool.bulk.queue | |
xfom: a lambda to apply to the numerical value | |
""" | |
value = data | |
# Traverse the nested dictionaries | |
for key in path.split('.'): | |
if value is not None: | |
value = value.get(key, None) | |
else: | |
break | |
if value is not None: | |
if xform: value = xform(value) | |
if METRICS[metric][0] == "gauge": | |
self.gauge(metric, value, tags=tags) | |
else: | |
self.rate(metric, value, tags=tags) | |
else: | |
self._metric_not_found(metric, path) | |
def _process_health_data(self, config_url, data, tags=None): | |
if self.cluster_status.get(config_url, None) is None: | |
self.cluster_status[config_url] = data['status'] | |
if data['status'] in ["yellow", "red"]: | |
event = self._create_event(data['status']) | |
self.event(event) | |
if data['status'] != self.cluster_status.get(config_url): | |
self.cluster_status[config_url] = data['status'] | |
event = self._create_event(data['status']) | |
self.event(event) | |
def process_metric(metric, xtype, path, xform=None): | |
# closure over node_data | |
self._process_metric(data, metric, path, xform, tags=tags) | |
self._map_metric(process_metric) | |
@classmethod | |
def _map_metric(cls, func): | |
"""Apply a function to all known metrics. | |
Used to create and sample metrics. | |
""" | |
for metric in METRICS: | |
# metric description | |
desc = METRICS[metric] | |
func(metric, *desc) | |
def _metric_not_found(self, metric, path): | |
self.log.debug("Metric not found: %s -> %s", path, metric) | |
def _create_event(self, status): | |
hostname = self.hostname.decode('utf-8') | |
if status == "red": | |
alert_type = "error" | |
msg_title = "%s is %s" % (hostname, status) | |
elif status == "yellow": | |
alert_type = "warning" | |
msg_title = "%s is %s" % (hostname, status) | |
else: | |
# then it should be green | |
alert_type = "success" | |
msg_title = "%s recovered as %s" % (hostname, status) | |
msg = "ElasticSearch: %s just reported as %s" % (hostname, status) | |
return { 'timestamp': int(time.time()), | |
'event_type': 'elasticsearch', | |
'host': hostname, | |
'api_key': self.agentConfig['api_key'], | |
'msg_text':msg, | |
'msg_title': msg_title, | |
"alert_type": alert_type, | |
"source_type_name": "elasticsearch", | |
"event_object": hostname | |
} | |
@staticmethod | |
def parse_agent_config(agentConfig): | |
if not agentConfig.get('elasticsearch'): | |
return False | |
return { | |
'instances': [{ | |
'url': agentConfig.get('elasticsearch'), | |
}] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
init_config: | |
instances: | |
# The URL where elasticsearch accepts HTTP requests. This will be used to | |
# fetch statistics from the nodes and information about the cluster health. | |
# | |
# If you're using basic authentication with a 3rd party library, for example | |
# elasticsearch-http-basic, you will need to specify and value for username | |
# and password for every instance that requires authentication. | |
# | |
#- url: localhost:9200 | |
# username: username | |
# password: password |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment