Skip to content

Instantly share code, notes, and snippets.

@tonyseek
Created December 8, 2016 07:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tonyseek/67cf7dd7d1f1604c01c9075bfb159503 to your computer and use it in GitHub Desktop.
Save tonyseek/67cf7dd7d1f1604c01c9075bfb159503 to your computer and use it in GitHub Desktop.
"""
ZooKeeper Monitor
=================
This is a simple script to monitor ZooKeeper node by "mntr" command. The
command response will be collected to the statsd server.
"""
import argparse
import logging
import re
import socket
import time
__version__ = '0.1.2'
logger = logging.getLogger(__name__)
class ZooKeeperBeat(object):
_mntr_re = re.compile(r'^zk_([a-z_]+)\s+(.+)$', re.MULTILINE)
def __init__(self, host, port, interval):
self.host = host
self.port = int(port)
self.interval = int(interval)
def __call__(self):
while True:
parsed = {}
try:
response = self.send('mntr', 10240)
except socket.error as e:
logger.error('receive mntr failed: %r' % e)
else:
parsed.update(self.parse_mntr(response))
yield parsed
time.sleep(self.interval)
def send(self, command, chunk_size):
sock = socket.socket()
sock.connect((self.host, self.port))
sock.send(command)
return sock.recv(chunk_size)
def parse_mntr(self, response):
for m in self._mntr_re.finditer(response):
key, value = m.groups()
if value.isdigit():
value = int(value)
yield key, value
class StatsdClient(object):
def __init__(self, host, port, prefix):
self.host = host
self.port = int(port)
self.prefix = prefix.rstrip('.') + '.'
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def send(self, data):
data = '\n'.join(
'{0}{1}:{2}'.format(self.prefix, k, '|'.join(map(str, v)))
for k, v in data.items())
try:
self.sock.sendto(data, (self.host, self.port))
except socket.error as e:
logger.error('send to statsd failed: %r' % e)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--zk-host', default='127.0.0.1')
parser.add_argument('--zk-port', default=2181, type=int)
parser.add_argument('-i', '--interval', default=1, type=int)
parser.add_argument('-p', '--prefix', required=True)
parser.add_argument('--statsd-host', default='127.0.0.1')
parser.add_argument('--statsd-port', default=8125, type=int)
parser.add_argument('-d', '--debug', action='store_true', default=False)
parser.add_argument('-v', '--verbose', action='store_true', default=False)
parser.add_argument('--version', action='version', version=__version__)
return parser.parse_args()
def main():
args = parse_args()
if args.debug:
logging_level = logging.DEBUG
elif args.verbose:
logging_level = logging.INFO
else:
logging_level = logging.WARNING
logging.basicConfig(
level=logging_level,
format='%(asctime)s [%(name)s:%(levelname)s:%(process)d] %(message)s',
)
beat = ZooKeeperBeat(args.zk_host, args.zk_port, args.interval)
statsd = StatsdClient(args.statsd_host, args.statsd_port, args.prefix)
for b in beat():
metrics = {
'avg_latency': (b.get('avg_latency', 0), 'g'),
'max_latency': (b.get('max_latency', 0), 'g'),
'min_latency': (b.get('min_latency', 0), 'g'),
'num_alive_connections': (b.get('num_alive_connections', 0), 'g'),
'outstanding_requests': (b.get('outstanding_requests', 0), 'g'),
'packets_received': (b.get('packets_received', 0), 'g'),
'packets_sent': (b.get('packets_sent', 0), 'g'),
'znode_count': (b.get('znode_count', 0), 'g'),
'watch_count': (b.get('watch_count', 0), 'g'),
'ephemerals_count': (b.get('ephemerals_count', 0), 'g'),
'open_fd_count': (b.get('open_file_descriptor_count', 0), 'g'),
}
if b.get('server_state') == 'leader':
metrics['followers'] = (b.get('followers', 0), 'g')
metrics['synced_followers'] = (b.get('synced_followers', 0), 'g')
metrics['pending_syncs'] = (b.get('pending_syncs', 0), 'g')
statsd.send(metrics)
logger.debug(repr(metrics))
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment