Created
March 31, 2023 19:15
-
-
Save premkumr/9a58119cfbeb16e8141d29b56c6ba947 to your computer and use it in GitHub Desktop.
Fetch yb metrics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import copy | |
import json | |
import os | |
import re | |
import shelve | |
import sys | |
import tempfile | |
import time | |
from pathlib import Path | |
from typing import Optional | |
from collections import OrderedDict | |
try: | |
import requests | |
except Exception as e: | |
print("Please install requests via `pip install requests`") | |
sys.exit(1) | |
try: | |
from tabulate import tabulate | |
except Exception as e: | |
print("Please install tabulate via `pip install tabulate`") | |
sys.exit(1) | |
hostname = "127.0.0.1" | |
port = 9000 | |
class MetricsTracker: | |
def __init__(self): | |
dbfile = os.path.join(tempfile.gettempdir(), 'metrics.db') | |
self._db = None | |
try: | |
self._db = shelve.open(dbfile, writeback=True) | |
except Exception as e: | |
print('Unable to open db.. Please check if there is another metrics process running!!!') | |
sys.exit(1) | |
self.keypattern = '.*' | |
self.full_tabletid = False | |
self.hosts = ['127.0.0.1:9000'] | |
if 'items' not in self._db: | |
self._db['items'] = [] | |
self.db = self._db['items'] | |
del self.db[2:] | |
self.print_count = 0 | |
self.failedhosts = set() | |
def __del__(self): | |
if self._db is not None: | |
self._db.close() | |
def clean(self): | |
self.db=[] | |
def get_metrics(self): | |
clean_data = {} | |
timestamp = time.time() | |
for host in self.hosts: | |
#print('fetching from : {}'.format(host)) | |
hostname = host | |
if ':' not in hostname: | |
hostname = '{}:{}'.format(host, 9000) | |
url = 'http://{}/metrics'.format(hostname) | |
try: | |
response = requests.get(url) | |
if hostname in self.failedhosts: | |
print('back online : [{}]'.format(hostname)) | |
self.failedhosts.remove(hostname) | |
except requests.exceptions.ConnectionError as e: | |
if hostname not in self.failedhosts: | |
print('unable to connnect to : [{}]'.format(hostname)) | |
self.failedhosts.add(hostname) | |
continue | |
data = json.loads(response.text) | |
# tablet data | |
for tablet in data: | |
if tablet['type']=='tablet' and tablet['attributes']['namespace_name'] != 'system': | |
tinfo = {} | |
tinfo['namespace_name'] = tablet['attributes']['namespace_name'] | |
tinfo['table_name'] = tablet['attributes']['table_name'] | |
tinfo['hostname'] = hostname | |
tinfo['metrics'] = {} | |
for m in tablet['metrics']: | |
if m['value'] > 0 : | |
tinfo['metrics'][m['name']] = m['value'] | |
# get only the leaders | |
if tinfo['metrics'].get('is_raft_leader', 0) == 0: continue | |
clean_data[tablet['id']] = tinfo | |
hash_code = hash(str(clean_data)) | |
if len(self.db) > 0 : | |
last_hash, _, _ = self.db[0] | |
if last_hash == hash_code: | |
# no change from last time | |
return None | |
self.db.insert(0, (hash_code, timestamp, clean_data)) | |
return clean_data | |
def get_diff(self, current, last): | |
if last is None: | |
print('last is none') | |
return current | |
data = copy.deepcopy(current) | |
for tid, tdata in current.items(): | |
tmetrics = tdata.setdefault('metrics', {}) | |
dmetrics = data[tid].setdefault('metrics', {}) | |
if tmetrics.get('is_raft_leader', 0) == 1 and tid in last: | |
last_metrics = last[tid]['metrics'] | |
for k in tmetrics: | |
if k in last_metrics: | |
dmetrics.setdefault(k,0) | |
dmetrics[k] -= last_metrics[k] | |
# remove 0 & -ve values | |
if dmetrics[k] <= 0: | |
del dmetrics[k] | |
else: | |
dmetrics[k] = tmetrics[k] | |
if len(dmetrics) == 0: | |
del data[tid] | |
elif tmetrics.get('is_raft_leader', 0) == 1 and tid not in last: | |
pass | |
else: | |
del data[tid] | |
return data | |
def display_data(self, data): | |
self.print_count += 1 | |
error = '' | |
if len(self.failedhosts) > 0 : | |
error = ' : [unable to fetch from {} '.format(sorted(list(self.failedhosts))) | |
print('>>> {}{}'.format(self.print_count, error)) | |
print(data) | |
print("\n") | |
def print_metrics(self, metrics, vertical=True, top = None): | |
table=[] | |
pattern = re.compile(self.keypattern) | |
keyset = set() | |
if vertical: | |
for tid in metrics: | |
metrics[tid].setdefault('metrics', {}) | |
for k,v in metrics[tid]['metrics'].items(): | |
k = k.replace('rocksdb_number_','').replace('rocksdb_','') | |
if pattern.match(k): | |
keyset.add(k) | |
tablet_id = tid if self.full_tabletid else tid[:12] + '...' | |
table.append([k, v, tablet_id, metrics[tid]['table_name'], metrics[tid]['hostname']]) | |
if len(keyset) == 1 : | |
# sort the table on this key | |
k = keyset.pop() | |
keyset.add(k) | |
table.sort(key=lambda a : a[1], reverse=True) | |
if top: | |
del table[top:] | |
headers=('metric', 'value', 'tablet-id', 'table', 'host') | |
align=('right','center', 'left', 'left') | |
if len(table) > 0: | |
self.display_data(tabulate(table, headers, tablefmt="presto", colalign=align)) | |
else: | |
for tid in metrics: | |
tablet_id = tid if self.full_tabletid else tid[:12] + '...' | |
data = { 'tablet-id' : tablet_id, 'table': metrics[tid]['table_name'],'host' : metrics[tid]['hostname']} | |
for k,v in metrics[tid]['metrics'].items(): | |
k = k.replace('rocksdb_number_','').replace('rocksdb_','') | |
if pattern.match(k): | |
data[k] = v | |
keyset.add(k) | |
table.append(data) | |
table.sort(key=lambda a : a.get('table', '')) | |
if len(keyset) == 1 : | |
# sort the table on this key | |
k = keyset.pop() | |
keyset.add(k) | |
table.sort(key=lambda a : a.get(k, 0), reverse=True) | |
if top: | |
del table[top:] | |
if len(table) > 0 and len(keyset) > 0: | |
pre_headers = ['tablet-id', 'table', 'host'] | |
post_headers = sorted(list(keyset)) | |
headers = OrderedDict.fromkeys(pre_headers + post_headers) | |
for k in headers: | |
headers[k] = k | |
table.insert(0, headers) | |
self.display_data(tabulate(table, tablefmt="presto", headers='firstrow')) | |
def monitor(self, interval = 10, vertical = True, top=None): | |
try: | |
while True: | |
current = self.get_metrics() | |
if current is not None: | |
last = None | |
if len(self.db) > 1 : | |
last = self.db[1][2] | |
diff = self.get_diff(current, last) | |
if len(diff) > 0: | |
self.print_metrics(diff, vertical, top) | |
else: | |
#print('no change') | |
pass | |
time.sleep(interval) | |
except KeyboardInterrupt: | |
pass | |
def tablets(self): | |
metrics = self.get_metrics() | |
tablet_info = [] | |
for tid in metrics: | |
tablet = metrics[tid] | |
tablet.setdefault('metrics', {}) | |
table_name = metrics[tid]['table_name'] | |
tablet_info.append([ table_name, tid, tablet['metrics'].get('is_raft_leader',0)]) | |
print(tabulate(tablet_info, tablefmt="presto", headers=['table','tablet', 'leader'])) | |
def cli(argv: Optional[str] = None): | |
argv = argv or sys.argv[:] | |
prog_name = Path(argv[0]).name | |
parser = argparse.ArgumentParser(prog=prog_name, description='Metrics Monitor') | |
parser.add_argument('-i', '--interval', dest='interval', type=int, default = 2, help = 'time to wait') | |
parser.add_argument('--top', dest='top', type = int, default = None, help = 'top N tablet ids') | |
parser.add_argument('-v', '--vertical', dest='vertical', default = False, action='store_true') | |
parser.add_argument('--no-vertical', dest='vertical', default = False, action='store_false') | |
parser.add_argument('--full-tabletid', dest='full_tabletid', default = False, action='store_true', help='print full tablet id') | |
parser.add_argument('-k', '--keys', dest='keys', default = '(rows_inserted|db_seek|db_next)$', help='Key pattern(regex)') | |
parser.add_argument('--rwkeys', default = False, action='store_true', help='only rocks r/w keys') | |
parser.add_argument('--read', default = False, action='store_true', help='only rocks read key') | |
parser.add_argument('--write', default = False, action='store_true', help='only rocks write key') | |
parser.add_argument('--txn', default = False, action='store_true', help='only txn keys') | |
parser.add_argument('--host', dest='hosts', action='append', default=['127.0.0.1'], help = 'tserver hosts (host:port[9000])') | |
parser.add_argument('-m', '--mode', dest='mode', choices=['monitor', 'tablets','clean'], default='monitor', nargs='?', help = 'Execution mode') | |
args = parser.parse_args() | |
if args.rwkeys: | |
args.keys = '(rows_inserted|db_seek)$' | |
elif args.read: | |
args.keys = '(db_seek)$' | |
elif args.write: | |
args.keys = '(rows_inserted)$' | |
elif args.txn: | |
args.keys = '(transaction)' | |
m = MetricsTracker() | |
m.hosts = args.hosts | |
m.keypattern = args.keys | |
m.full_tabletid = args.full_tabletid | |
if args.mode == 'monitor': | |
m.monitor(args.interval, args.vertical, args.top) | |
elif args.mode == 'tablets': | |
m.tablets() | |
elif args.mode == 'clean': | |
m.clean() | |
if __name__ == "__main__": | |
cli() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment