Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Cassandra Replication Monitoring Blog Post
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import sys
from collections import defaultdict
from collections import namedtuple
import requests
RequestArgs = namedtuple(
'RequestArgs',
('host', 'port', 'timeout', 'base_path')
)
# Convenience methods for talking to jolokia
def _make_request(args, path):
try:
uri = 'http://{host}:{port}{base_path}{path}'.format(
host=args.host, port=args.port, base_path=args.base_path, path=path
)
body = requests.get(uri, timeout=args.timeout).json()
except Exception, e:
msg = ('Had exception while connecting to {0}:{1} -- '
'{2}'.format(args.host, args.port, e))
raise RuntimeError(msg)
return body
def _get_mbean(args, domain, bean, op_type='read'):
bean_path = '/{0}/{1}:{2}'.format(op_type, domain, bean)
return _make_request(args, bean_path)
def cassandra_bean_getter(args):
''' Generates a getter method that hits the jolokia HTTP endpoint
to retrieve cassandra mbeans exposed via the JMX interface.
:param: args: An instance of the tuple representing jolokia args
:py:obj:`cassandra_tools.cassandra_utils.RequestArgs`
:returns: a cassandra mbean getter function
'''
def get_cassandra_bean(bean, op_type='read', bean_attribute='value'):
return _get_mbean(
args,
'org.apache.cassandra.db',
bean,
op_type
)[bean_attribute]
return get_cassandra_bean
# Methods that actually interpret cassandra replication state
# The basic call order is
# run_check -> check_keyspace_replication -> check_keyspace -> check_range
def check_range(responsible_nodes, consistency_level,
dc_to_live_nodes, node_to_dc):
nodes = set(responsible_nodes)
alive_in_each_dc = dict(
(dc, len(nodes & set(dc_nodes))) for
dc, dc_nodes in dc_to_live_nodes.iteritems()
)
if consistency_level == 'one':
return sum(alive_in_each_dc.values()) >= 1
elif consistency_level == 'quorum':
return sum(alive_in_each_dc.values()) > (len(nodes) / 2)
elif consistency_level == 'all':
return sum(alive_in_each_dc.values()) >= len(nodes)
else:
# We're looking at "local" consistency levels
expected_in_each_dc = defaultdict(int)
for node in nodes:
expected_in_each_dc[node_to_dc[node]] += 1
if consistency_level == 'local_one':
return all(
alive_in_each_dc.get(dc, 0) >= 1 for dc in expected_in_each_dc
)
elif consistency_level == 'local_quorum':
return all(
alive_in_each_dc.get(dc, 0) > (expected / 2) for
dc, expected in expected_in_each_dc.iteritems()
)
raise Exception('Do not understand {0} consistency level'.format(
consistency_level
))
def check_keyspace(get_cassandra_bean, keyspace,
available_nodes, consistency_level):
token_ranges = get_cassandra_bean(
'type=StorageService/getRangeToEndpointMap/{0}'.format(keyspace),
op_type='exec'
)
all_responsible_nodes = set()
for nodes in token_ranges.values():
all_responsible_nodes |= set(nodes)
dc_to_live_nodes = defaultdict(list)
node_to_dc = {}
for node in all_responsible_nodes:
dc = get_cassandra_bean(
'type=EndpointSnitchInfo/getDatacenter/{0}'.format(node),
op_type='exec'
)
node_to_dc[node] = dc
if node in available_nodes:
dc_to_live_nodes[dc].append(node)
bad_ranges = 0
for token_range, nodes in token_ranges.iteritems():
range_is_healthy = check_range(
nodes, consistency_level, dc_to_live_nodes, node_to_dc
)
if not range_is_healthy:
bad_ranges += 1
return bad_ranges
def check_keyspace_replication(get_cassandra_bean, consistency_level):
keyspaces = get_cassandra_bean('type=StorageService/Keyspaces')
live_nodes = get_cassandra_bean('type=StorageService/LiveNodes')
system_keyspaces = set(
['system', 'system_traces', 'system_auth', 'system_distributed']
)
keyspaces = list(set(keyspaces) - system_keyspaces)
# Dict of keyspace -> # ranges failing the consistency level
result = dict(
(keyspace, check_keyspace(
get_cassandra_bean, keyspace, live_nodes, consistency_level
))
for keyspace in keyspaces
)
return result
def run_check(args):
host_args = RequestArgs(
host=args.host, port=args.jolokia_port, timeout=args.jolokia_timeout,
base_path=args.jolokia_agent_path
)
get_cassandra_bean = cassandra_bean_getter(host_args)
result = check_keyspace_replication(
get_cassandra_bean, args.consistency_check
)
under_replicated = dict((k, result[k]) for k in result if result[k] > 0)
if len(under_replicated) == 0:
print('OK: cluster looks fine')
else:
partitions = '[Underreplicated partitions: {0}]'.format(
under_replicated
)
print(
'CRITICAL: cluster cannot complete operations at consistency '
'level {0}. {1}'.format(
args.consistency_check, partitions
)
)
return 2
return 0
# Parsing arguments and other Python command line boilerplate.
def parse_args():
parser = argparse.ArgumentParser(
description=(
'check_cassandra_cluster verifies the availability of all'
' non-system keyspaces against the desired consistency level.'
),
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
'--host', dest='host', default='localhost',
help='Target cassandra host to bootstrap off of.'
)
parser.add_argument(
'--jolokia-port', dest='jolokia_port', default=8999,
help='Cassandra jolokia port'
)
parser.add_argument(
'--jolokia-agent-path', dest='jolokia_agent_path', default='/jolokia',
help='Cassandra jolokia agent path'
)
parser.add_argument(
'--jolokia-timeout', dest='jolokia_timeout', default=12.0, type=float,
help='Cassandra jolokia timeout in seconds'
)
parser.add_argument(
'consistency_check',
choices=['one', 'quorum', 'all', 'local_one', 'local_quorum'],
default='local_quorum',
help='Alert if we lose the ability to do the provided '
'consistency level operations'
)
parser.set_defaults(func=run_check)
return parser.parse_args()
def main():
args = parse_args()
try:
sys.exit(args.func(args))
except Exception, e:
print('UNKNOWN: Uncaught exception {0}'.format(e))
sys.exit(3)
if __name__ == '__main__':
main()
replication_level = input(...)
alive_nodes = getLiveNodes(...)
keyspaces = getKeyspaces(...) - system_keyspaces()
underreplicated_keyspaces = {}
for keyspace in keyspace:
keyspace_ranges = getRangeToEndpointMap(keyspace)
for _, range_nodes in keyspace_ranges:
if |range_nodesalive_nodes| < replication_level:
underreplicated_keyspaces[keyspace] = True
if underreplicated_keyspaces:
alert_cluster_operator(underreplicated_keyspaces)
cqlsh:blog_1> DESCRIBE CLUSTER
Cluster: test
Partitioner: Murmur3Partitioner
Range ownership:
3732272507813644288 [NODE_A]
2788686370127196454 [NODE_B]
-7530935857429381116 [NODE_B]
111150679707998215 [NODE_B]
3524081553196673032 [NODE_A]
2388757566836860943 [NODE_B]
6174693015045179395 [NODE_A]
81565236350140436 [NODE_B]
9067682831513077639 [NODE_C]
3254554184914284573 [NODE_B]
8220009980887493637 [NODE_C]
...
cqlsh:blog_1> use blog_3 ;
cqlsh:blog_3> DESCRIBE CLUSTER
Cluster: test
Partitioner: Murmur3Partitioner
Range ownership:
3732272507813644288 [NODE_A, NODE_B, NODE_C]
2788686370127196454 [NODE_B, NODE_C, NODE_A]
-7530935857429381116 [NODE_B, NODE_C, NODE_A]
111150679707998215 [NODE_B, NODE_C, NODE_A]
3524081553196673032 [NODE_A, NODE_B, NODE_C]
2388757566836860943 [NODE_B, NODE_C, NODE_A]
6174693015045179395 [NODE_A, NODE_B, NODE_C]
...
% For displaying ring topologies
% I apologize for the quality of this code... I just hacked it together
% Author : Joseph Lynch
% Based on the simple cylce from
% Author : Jerome Tremblay
\documentclass{article}
\usepackage{tikz}
\usetikzlibrary{decorations.text}
%%%<
\usepackage{verbatim}
\usepackage{verbatim}
\usepackage[active,tightpage]{preview}
\PreviewEnvironment{tikzpicture}
\setlength\PreviewBorder{5pt}%
%%%>
\begin{comment}
:Title: A Cassandra Replication Ring
:Author: Joseph Lynch
\end{comment}
\begin{document}
\begin{tikzpicture}
\def \radius {3cm}
\def \innerr {2.6cm}
\def \outerr {3.4cm}
\def \textr {3.7cm}
\def \innertextr {2.3cm}
\def \nw {.6em}
\def \margin {8} % margin in angles, depends on the radius
\def \n {12}
\def \c {3}
% Yelp colors
% \definecolor{yred}{RGB}{196,18,0}
%\definecolor{yblue}{RGB}{59,101,167}
%\definecolor{yorange}{RGB}{221,81,20}
%\definecolor{ygreen}{RGB}{60,181,46}
%\definecolor{ywhite}{RGB}{247,247,247}
%\definecolor{yblack}{RGB}{51,51,51}
% Economist colors
\definecolor{yring}{RGB}{130, 192, 233}
\definecolor{yred}{RGB}{144, 53, 59}
\definecolor{yyellow}{RGB}{236, 161, 32}
\definecolor{ynode1}{RGB}{62, 100, 125}
\definecolor{ynode4}{RGB}{236, 161, 32}
\definecolor{ynode3}{RGB}{144, 53, 59}
\definecolor{ynode2}{RGB}{198, 211, 223}
% Ring
\path [draw=none,fill=yring, fill opacity = 1.0,even odd rule] (0,0) circle (\outerr) (0,0) circle (\innerr);
% Legend
\node[draw, circle,draw=none, color=yyellow, fill opacity=1.0,inner sep=0em, label=right:{Dead Node}] at (4.5,-2) {\LARGE X};
\node[draw, circle,fill=ynode2, fill opacity=1.0,inner sep=\nw, label=right:{Node D}] at (4.5,-1) {};
\node[draw, circle,fill=ynode3, fill opacity=1.0,inner sep=\nw, label=right:{Node C}] at (4.5,0) {};
\node[draw, circle,fill=ynode4, fill opacity=1.0,inner sep=\nw, label=right:{Node B}] at (4.5,1) {};
\node[draw, circle,fill=ynode1, fill opacity=1.0,inner sep=\nw, label=right:{Node A}] at (4.5,2) {};
% Token ranges
\foreach \s in {1,...,\n}
{
\node[draw=none] at ({360-(360/\n * (\s+8))}:\textr) {\s};
}
\foreach \m in {0,...,\c}
{
\def \s {(\c+1)*\m}
\node[draw, circle,fill=ynode1, fill opacity=1.0,inner sep=\nw] at ({360/\n * (\s - 1)}:\radius) {};
\draw[<-, >=latex] ({360/\n * (\s - 1)+\margin}:\radius)
arc ({360/\n * (\s - 1)+\margin}:{360/\n * (\s)-\margin}:\radius); \node[draw=none,color=yyellow] at ({360/\n * (\s - 1)}:\radius) {\LARGE X};
\node[draw=none,color=yred] at ({360/\n * (\s - 1)}:\innertextr) {\textbf{1}};
\def \s {(\c+1)*\m+1}
\node[draw, circle,fill=ynode2, fill opacity=1.0,inner sep=\nw] at ({360/\n * (\s - 1)}:\radius) {};
\draw[<-, >=latex] ({360/\n * (\s - 1)+\margin}:\radius)
arc ({360/\n * (\s - 1)+\margin}:{360/\n * (\s)-\margin}:\radius);
\node[draw=none,color=yyellow] at ({360/\n * (\s - 1)}:\innertextr) {\textbf{2}};
\def \s {(\c+1)*\m+2}
\node[draw, circle,fill=ynode3, fill opacity=1.0,inner sep=\nw] at ({360/\n * (\s - 1)}:\radius) {};
\draw[<-, >=latex] ({360/\n * (\s - 1)+\margin}:\radius)
arc ({360/\n * (\s - 1)+\margin}:{360/\n * (\s)-\margin}:\radius);
% Optional Xs to indicate failures
\node[draw=none,color=yyellow] at ({360/\n * (\s - 1)}:\radius) {\LARGE X};
\node[draw=none,color=yred] at ({360/\n * (\s - 1)}:\innertextr) {\textbf{1}};
\def \s {(\c+1)*\m+3}
\node[draw, circle,fill=ynode4, fill opacity=1.0,inner sep=\nw] at ({360/\n * (\s - 1)}:\radius) {};
\draw[<-, >=latex] ({360/\n * (\s - 1)+\margin}:\radius)
arc ({360/\n * (\s - 1)+\margin}:{360/\n * (\s)-\margin}:\radius);
\node[draw=none,color=yyellow] at ({360/\n * (\s - 1)}:\innertextr) {\textbf{2}};
}
\def \s {4}
% Token replication arc
\draw[<-, >=latex, postaction={decorate,decoration={text along path,text align=center,text={token replication}, reverse path, raise={-2.5ex}}}] (30:4.4cm)
arc (30:90:4.4cm) node [pos=0.5, above, black, sloped] {};
\draw[-, >=latex, postaction={decorate,decoration={text along path,text align=center,text={available replicas}, reverse path, raise={-2.5ex}}}] (270:2.1cm)
arc (270:630:2.1cm) node [pos=0.5, above, black, sloped] {};
\end{tikzpicture}
\end{document}
# Dependencies of the monitoring script
requests
argparse
# All nodes are running normally
# Check availability of keyspaces
(venv)$ ./check_cassandra_cluster --host NODE_A all
OK: cluster looks fine
(venv)$ ./check_cassandra_cluster --host NODE_A local_quorum
OK: cluster looks fine
# Stop NODE_A
# ...
# Check availability of keyspaces
(venv)$ ./check_cassandra_cluster --host NODE_B all
CRITICAL: cluster cannot complete operations at consistency level all.
[Underreplicated partitions: {u'blog_3': 768, u'blog_1': 256}]
(venv)$ ./check_cassandra_cluster --host NODE_B local_quorum
CRITICAL: cluster cannot complete operations at consistency level local_quorum.
[Underreplicated partitions: {u'blog_1': 256}]
(venv)$ ./check_cassandra_cluster --host NODE_B local_one
CRITICAL: cluster cannot complete operations at consistency level local_one.
[Underreplicated partitions: {u'blog_1': 256}]
# Stop NODE_B as well
# ...
# Check availability of keyspaces
(venv)$ ./check_cassandra_cluster --host NODE_C all
CRITICAL: cluster cannot complete operations at consistency level all.
[Underreplicated partitions: {u'blog_3': 768, u'blog_1': 512}]
(venv)$ ./check_cassandra_cluster --host NODE_C local_quorum
CRITICAL: cluster cannot complete operations at consistency level local_quorum.
[Underreplicated partitions: {u'blog_3': 768, u'blog_1': 512}]
(venv)$ ./check_cassandra_cluster --host NODE_C local_one
CRITICAL: cluster cannot complete operations at consistency level local_one.
[Underreplicated partitions: {u'blog_1': 512}]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.