Skip to content

Instantly share code, notes, and snippets.

@masci
Created May 10, 2017 12:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save masci/244c1893e0bc3cf64fa4a5dab90bd601 to your computer and use it in GitHub Desktop.
Save masci/244c1893e0bc3cf64fa4a5dab90bd601 to your computer and use it in GitHub Desktop.
# (C) Datadog, Inc. 2010-2016
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
'''
Redis checks
'''
# stdlib
from collections import defaultdict
import re
import time
# 3rd party
import redis
# project
from checks import AgentCheck
DEFAULT_MAX_SLOW_ENTRIES = 128
MAX_SLOW_ENTRIES_KEY = "slowlog-max-len"
REPL_KEY = 'master_link_status'
LINK_DOWN_KEY = 'master_link_down_since_seconds'
class Redis(AgentCheck):
db_key_pattern = re.compile(r'^db\d+')
slave_key_pattern = re.compile(r'^slave\d+')
subkeys = ['keys', 'expires']
SOURCE_TYPE_NAME = 'redis'
GAUGE_KEYS = {
# Append-only metrics
'aof_last_rewrite_time_sec': 'redis.aof.last_rewrite_time',
'aof_rewrite_in_progress': 'redis.aof.rewrite',
'aof_current_size': 'redis.aof.size',
'aof_buffer_length': 'redis.aof.buffer_length',
# Network
'connected_clients': 'redis.net.clients',
'connected_slaves': 'redis.net.slaves',
'rejected_connections': 'redis.net.rejected',
# clients
'blocked_clients': 'redis.clients.blocked',
'client_biggest_input_buf': 'redis.clients.biggest_input_buf',
'client_longest_output_list': 'redis.clients.longest_output_list',
# Keys
'evicted_keys': 'redis.keys.evicted',
'expired_keys': 'redis.keys.expired',
# stats
'latest_fork_usec': 'redis.perf.latest_fork_usec',
'bytes_received_per_sec': 'redis.bytes_received_per_sec',
'bytes_sent_per_sec': 'redis.bytes_sent_per_sec',
# Note: 'bytes_received_per_sec' and 'bytes_sent_per_sec' are only
# available on Azure Redis
# pubsub
'pubsub_channels': 'redis.pubsub.channels',
'pubsub_patterns': 'redis.pubsub.patterns',
# rdb
'rdb_bgsave_in_progress': 'redis.rdb.bgsave',
'rdb_changes_since_last_save': 'redis.rdb.changes_since_last',
'rdb_last_bgsave_time_sec': 'redis.rdb.last_bgsave_time',
# memory
'mem_fragmentation_ratio': 'redis.mem.fragmentation_ratio',
'used_memory': 'redis.mem.used',
'used_memory_lua': 'redis.mem.lua',
'used_memory_peak': 'redis.mem.peak',
'used_memory_rss': 'redis.mem.rss',
# replication
'master_last_io_seconds_ago': 'redis.replication.last_io_seconds_ago',
'master_sync_in_progress': 'redis.replication.sync',
'master_sync_left_bytes': 'redis.replication.sync_left_bytes',
'repl_backlog_histlen': 'redis.replication.backlog_histlen',
'master_repl_offset': 'redis.replication.master_repl_offset',
'slave_repl_offset': 'redis.replication.slave_repl_offset',
}
RATE_KEYS = {
# cpu
'used_cpu_sys': 'redis.cpu.sys',
'used_cpu_sys_children': 'redis.cpu.sys_children',
'used_cpu_user': 'redis.cpu.user',
'used_cpu_user_children': 'redis.cpu.user_children',
# stats
'keyspace_hits': 'redis.stats.keyspace_hits',
'keyspace_misses': 'redis.stats.keyspace_misses',
}
def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances)
self.connections = {}
self.last_timestamp_seen = defaultdict(int)
def get_library_versions(self):
return {"redis": redis.__version__}
def _parse_dict_string(self, string, key, default):
"""Take from a more recent redis.py, parse_info"""
try:
for item in string.split(','):
k, v = item.rsplit('=', 1)
if k == key:
try:
return int(v)
except ValueError:
return v
return default
except Exception:
self.log.exception("Cannot parse dictionary string: %s" % string)
return default
def _generate_instance_key(self, instance):
if 'unix_socket_path' in instance:
return (instance.get('unix_socket_path'), instance.get('db'))
else:
return (instance.get('host'), instance.get('port'), instance.get('db'))
def _get_conn(self, instance):
key = self._generate_instance_key(instance)
if key not in self.connections:
try:
# Only send useful parameters to the redis client constructor
list_params = ['host', 'port', 'db', 'password', 'socket_timeout',
'connection_pool', 'charset', 'errors', 'unix_socket_path', 'ssl',
'ssl_certfile', 'ssl_keyfile', 'ssl_ca_certs', 'ssl_cert_reqs']
# Set a default timeout (in seconds) if no timeout is specified in the instance config
instance['socket_timeout'] = instance.get('socket_timeout', 5)
connection_params = dict((k, instance[k]) for k in list_params if k in instance)
self.connections[key] = redis.Redis(**connection_params)
except TypeError:
raise Exception("You need a redis library that supports authenticated connections. Try sudo easy_install redis.")
return self.connections[key]
def _get_tags(self, custom_tags, instance):
tags = set(custom_tags or [])
if 'unix_socket_path' in instance:
tags_to_add = [
"redis_host:%s" % instance.get("unix_socket_path"),
"redis_port:unix_socket",
]
else:
tags_to_add = [
"redis_host:%s" % instance.get('host'),
"redis_port:%s" % instance.get('port')
]
tags = sorted(tags.union(tags_to_add))
return tags
def _check_db(self, instance, custom_tags=None):
conn = self._get_conn(instance)
tags = self._get_tags(custom_tags, instance)
# Ping the database for info, and track the latency.
# Process the service check: the check passes if we can connect to Redis
start = time.time()
info = None
try:
info = conn.info()
status = AgentCheck.OK
self.service_check('redis.can_connect', status, tags=tags)
self._collect_metadata(info)
except ValueError:
status = AgentCheck.CRITICAL
self.service_check('redis.can_connect', status, tags=tags)
raise
except Exception:
status = AgentCheck.CRITICAL
self.service_check('redis.can_connect', status, tags=tags)
raise
latency_ms = round((time.time() - start) * 1000, 2)
self.gauge('redis.info.latency_ms', latency_ms, tags=tags)
# Save the database statistics.
for key in info.keys():
if self.db_key_pattern.match(key):
db_tags = list(tags) + ["redis_db:" + key]
# allows tracking percentage of expired keys as DD does not
# currently allow arithmetic on metric for monitoring
expires_keys = info[key]["expires"]
total_keys = info[key]["keys"]
persist_keys = total_keys - expires_keys
self.gauge("redis.persist", persist_keys, tags=db_tags)
self.gauge("redis.persist.percent", 100.0 * persist_keys / total_keys, tags=db_tags)
self.gauge("redis.expires.percent", 100.0 * expires_keys / total_keys, tags=db_tags)
for subkey in self.subkeys:
# Old redis module on ubuntu 10.04 (python-redis 0.6.1) does not
# returns a dict for those key but a string: keys=3,expires=0
# Try to parse it (see lighthouse #46)
val = -1
try:
val = info[key].get(subkey, -1)
except AttributeError:
val = self._parse_dict_string(info[key], subkey, -1)
metric = '.'.join(['redis', subkey])
self.gauge(metric, val, tags=db_tags)
# Save a subset of db-wide statistics
for info_name, value in info.iteritems():
if info_name in self.GAUGE_KEYS:
self.gauge(self.GAUGE_KEYS[info_name], info[info_name], tags=tags)
elif info_name in self.RATE_KEYS:
self.rate(self.RATE_KEYS[info_name], info[info_name], tags=tags)
# Save the number of commands.
self.rate('redis.net.commands', info['total_commands_processed'],
tags=tags)
# Check some key lengths if asked
key_list = instance.get('keys')
if key_list is not None:
if not isinstance(key_list, list) or len(key_list) == 0:
self.warning("keys in redis configuration is either not a list or empty")
else:
l_tags = list(tags)
for key in key_list:
key_type = conn.type(key)
key_tags = l_tags + ['key:' + key]
if key_type == 'list':
self.gauge('redis.key.length', conn.llen(key), tags=key_tags)
elif key_type == 'set':
self.gauge('redis.key.length', conn.scard(key), tags=key_tags)
elif key_type == 'zset':
self.gauge('redis.key.length', conn.zcard(key), tags=key_tags)
elif key_type == 'hash':
self.gauge('redis.key.length', conn.hlen(key), tags=key_tags)
else:
# If the type is unknown, it might be because the key doesn't exist,
# which can be because the list is empty. So always send 0 in that case.
if instance.get("warn_on_missing_keys", True):
self.warning("{0} key not found in redis".format(key))
self.gauge('redis.key.length', 0, tags=key_tags)
self._check_replication(info, tags)
if instance.get("command_stats", False):
self._check_command_stats(conn, tags)
def _check_replication(self, info, tags):
# Save the replication delay for each slave
for key in info:
if self.slave_key_pattern.match(key) and isinstance(info[key], dict):
slave_offset = info[key].get('offset')
master_offset = info.get('master_repl_offset')
if slave_offset and master_offset and master_offset - slave_offset >= 0:
delay = master_offset - slave_offset
# Add id, ip, and port tags for the slave
slave_tags = tags[:]
for slave_tag in ('ip', 'port'):
if slave_tag in info[key]:
slave_tags.append('slave_{0}:{1}'.format(slave_tag, info[key][slave_tag]))
slave_tags.append('slave_id:%s' % key.lstrip('slave'))
self.gauge('redis.replication.delay', delay, tags=slave_tags)
self.log.debug('Replication mode: %s', info.get('role'))
self.log.debug('master_link_status: %s', info.get('master_link_status'))
if REPL_KEY in info:
if info[REPL_KEY] == 'up':
status = AgentCheck.OK
down_seconds = 0
else:
status = AgentCheck.CRITICAL
down_seconds = info[LINK_DOWN_KEY]
self.service_check('redis.replication.master_link_status', status, tags=tags)
self.gauge('redis.replication.master_link_down_since_seconds', down_seconds, tags=tags)
def _check_slowlog(self, instance, custom_tags):
"""Retrieve length and entries from Redis' SLOWLOG
This will parse through all entries of the SLOWLOG and select ones
within the time range between the last seen entries and now
"""
conn = self._get_conn(instance)
tags = self._get_tags(custom_tags, instance)
if not instance.get(MAX_SLOW_ENTRIES_KEY):
try:
max_slow_entries = int(conn.config_get(MAX_SLOW_ENTRIES_KEY)[MAX_SLOW_ENTRIES_KEY])
if max_slow_entries > DEFAULT_MAX_SLOW_ENTRIES:
self.warning("Redis {0} is higher than {1}. Defaulting to {1}."
"If you need a higher value, please set {0} in your check config"
.format(MAX_SLOW_ENTRIES_KEY, DEFAULT_MAX_SLOW_ENTRIES))
max_slow_entries = DEFAULT_MAX_SLOW_ENTRIES
# No config on AWS Elasticache
except redis.ResponseError:
max_slow_entries = DEFAULT_MAX_SLOW_ENTRIES
else:
max_slow_entries = int(instance.get(MAX_SLOW_ENTRIES_KEY))
# Generate a unique id for this instance to be persisted across runs
ts_key = self._generate_instance_key(instance)
# Get all slowlog entries
slowlogs = conn.slowlog_get(max_slow_entries)
# Find slowlog entries between last timestamp and now using start_time
slowlogs = [s for s in slowlogs if s['start_time'] >
self.last_timestamp_seen[ts_key]]
max_ts = 0
# Slowlog entry looks like:
# {'command': 'LPOP somekey',
# 'duration': 11238,
# 'id': 496L,
# 'start_time': 1422529869}
for slowlog in slowlogs:
if slowlog['start_time'] > max_ts:
max_ts = slowlog['start_time']
slowlog_tags = list(tags)
command = slowlog['command'].split()
# When the "Garantia Data" custom Redis is used, redis-py returns
# an empty `command` field
# FIXME when https://github.com/andymccurdy/redis-py/pull/622 is released in redis-py
if command:
slowlog_tags.append('command:{0}'.format(command[0]))
value = slowlog['duration']
self.histogram('redis.slowlog.micros', value, tags=slowlog_tags)
self.last_timestamp_seen[ts_key] = max_ts
def _check_command_stats(self, conn, tags):
"""Get command-specific statistics from redis' INFO COMMANDSTATS command
"""
try:
command_stats = conn.info("commandstats")
except Exception:
self.warning("Could not retrieve command stats from Redis."
"INFO COMMANDSTATS only works with Redis >= 2.6.")
return
for key, stats in command_stats.iteritems():
command = key.split('_', 1)[1]
command_tags = tags + ['command:%s' % command]
self.gauge('redis.command.calls', stats['calls'], tags=command_tags)
self.gauge('redis.command.usec_per_call', stats['usec_per_call'], tags=command_tags)
def check(self, instance):
if ("host" not in instance or "port" not in instance) and "unix_socket_path" not in instance:
raise Exception("You must specify a host/port couple or a unix_socket_path")
custom_tags = instance.get('tags', [])
self._check_db(instance, custom_tags)
self._check_slowlog(instance, custom_tags)
def _collect_metadata(self, info):
if info and 'redis_version' in info:
self.service_metadata('version', info['redis_version'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment