Skip to content

Instantly share code, notes, and snippets.

@brgmn
Created February 1, 2023 07:48
Show Gist options
  • Save brgmn/9c852b52575238dd97ebf9be64ea8100 to your computer and use it in GitHub Desktop.
Save brgmn/9c852b52575238dd97ebf9be64ea8100 to your computer and use it in GitHub Desktop.
Proxytop Python 3 version
#!/usr/bin/env python
import curses
import argparse
import npyscreen
import MySQLdb
import sys
import time
### Option parser
def get_params():
parser = argparse.ArgumentParser()
parser.add_argument('-H','--host', help='ProxySQL hostname / IP address',
default='127.0.0.1',type=str)
parser.add_argument('-P','--port', help='ProxySQL Admin port',
default=6032,type=int)
parser.add_argument('-u','--user', help='ProxySQL Admin username',
default='admin',type=str)
parser.add_argument('-p','--password', help='ProxySQL Admin password',
default='admin',type=str)
parser.add_argument('-f','--dfile', help='Use the MySQL defaults file specified (expects a [proxysqladmin] group)',
default=None, type=str)
return parser.parse_args()
### ProxySQL Admin Access Class
class ProxysqlStats(object):
def __init__(self, cs):
self.cs = cs
self.cursor, self.conn, self.error = self.get_pa_cursor(host=cs.host, port=cs.port,
user=cs.user, password=cs.password,
dfile=cs.dfile)
def get_pa_cursor(self, host='127.0.0.1', port=6032, user='admin', password='admin', dfile=None, timeout=60):
"Connects to ProxySQL's Admin module and returns a DictCursor"
conn = None
cursor = None
error = None
try:
if dfile is None:
conn = MySQLdb.connect(host=host,
port=port,
user=user,
passwd=password,
connect_timeout=timeout)
else:
conn = MySQLdb.connect(host=host,
port=port,
read_default_file=dfile,
read_default_group='proxysqladmin',
connect_timeout=timeout)
conn.autocommit(True)
cursor = conn.cursor()
except Exception as e:
error = e
return cursor, conn, error
def get_col_sizes(self, title, data):
"Calculates column sizes for display area"
col_sizes = [0] * (len(title))
for row in data:
col_iter = 0
for item in row:
if len(item) > col_sizes[col_iter]:
col_sizes[col_iter] = len(item)
col_iter = col_iter + 1
return col_sizes
def get_cpheader(self):
"Generates the connection pool header"
try:
if self.error is None:
self.cursor.execute('SELECT * FROM stats.stats_mysql_global ' \
'WHERE variable_name IN ' \
'("ProxySQL_Uptime","Questions","MySQL_Thread_Workers","Slow_queries")')
data = self.cursor.fetchall()
title = self.cursor.description
fmt_data = '-'
for rvalue in data:
fmt_data = ' '.join([fmt_data,'{}: {}'.format(rvalue[0], rvalue[1])])
return fmt_data
else:
raise Exception('Hmm... connection seems to have been lost')
except Exception as e:
self.cursor, self.conn, self.error = self.get_pa_cursor(host=self.cs.host, port=self.cs.port,
user=self.cs.user, password=self.cs.password,
dfile=self.cs.dfile)
time.sleep(1)
return '- N/A'
def get_cpstats(self, sort = 'ConnOK DESC'):
"Generates the connection pool stats"
try:
if self.error is None:
self.cursor.execute('SELECT * FROM stats.stats_mysql_connection_pool ORDER BY {}'.format(sort))
data = self.cursor.fetchall()
title = self.cursor.description
col_sizes = self.get_col_sizes(title, data)
list_data = ['']
for row in data:
str_row = '{:<{width}}'.format('{}:{}'.format(row[1],row[2]),width=col_sizes[1]+col_sizes[2]+1)
str_row = '{} hg-{:<{width}}'.format(str_row, row[0],width=col_sizes[0])
str_row = '{} Conns(U/F/O/E) {:>{width}} / '.format(str_row, row[4],width=col_sizes[4])
str_row = '{}{:>{width}} / '.format(str_row, row[5],width=col_sizes[5])
str_row = '{}{:>{width}} / '.format(str_row, row[6],width=col_sizes[6])
str_row = '{}{:>{width}}'.format(str_row, row[7],width=col_sizes[7])
str_row = '{} {:>{width}}'.format(str_row, row[3],width=col_sizes[3])
str_row = '{} LT(us) {:>{width}}'.format(str_row, row[11],width=col_sizes[11])
list_data.append(str_row)
return list_data
else:
raise Exception('Hmm... connection seems to have been lost')
except Exception as e:
self.cursor, self.conn, self.error = self.get_pa_cursor(host=self.cs.host, port=self.cs.port,
user=self.cs.user, password=self.cs.password,
dfile=self.cs.dfile)
time.sleep(1)
return ['','{}: {}'.format('CRITICAL ERROR', self.error)]
def get_globstats(self):
"Generates the global stats"
try:
if self.error is None:
self.cursor.execute('SELECT * FROM stats.stats_mysql_global WHERE variable_name like ' \
'"%_Connections_%" or variable_name like "Active_Transactions"')
data = self.cursor.fetchall()
title = self.cursor.description
col_sizes = self.get_col_sizes(title, data)
list_data = ['','***** Connection Information: *****']
for row in data:
str_row = '{:<{width}}'.format(row[0],width=col_sizes[0])
str_row = '{}: {:<{width}}'.format(str_row, row[1],width=col_sizes[1])
list_data.append(str_row)
self.cursor.execute('SELECT * FROM stats.stats_mysql_global WHERE variable_name like "Stmt_%"')
data = self.cursor.fetchall()
title = self.cursor.description
col_sizes = self.get_col_sizes(title, data)
list_data.append('')
list_data.append('***** Statement Information: *****')
for row in data:
str_row = '{:<{width}}'.format(row[0],width=col_sizes[0])
str_row = '{}: {:<{width}}'.format(str_row, row[1],width=col_sizes[1])
list_data.append(str_row)
self.cursor.execute('SELECT * FROM stats.stats_mysql_global WHERE variable_name like "Com__%"')
data = self.cursor.fetchall()
title = self.cursor.description
col_sizes = self.get_col_sizes(title, data)
list_data.append('')
list_data.append('***** Command Information: *****')
for row in data:
str_row = '{:<{width}}'.format(row[0],width=col_sizes[0])
str_row = '{}: {:<{width}}'.format(str_row, row[1],width=col_sizes[1])
list_data.append(str_row)
self.cursor.execute('SELECT * FROM stats.stats_mysql_global WHERE variable_name like "Query_Cache_%"')
data = self.cursor.fetchall()
title = self.cursor.description
col_sizes = self.get_col_sizes(title, data)
list_data.append('')
list_data.append('***** Query Cache Information: *****')
for row in data:
str_row = '{:<{width}}'.format(row[0],width=col_sizes[0])
str_row = '{}: {:<{width}}'.format(str_row, row[1],width=col_sizes[1])
list_data.append(str_row)
return list_data
else:
raise Exception('Hmm... connection seems to have been lost')
except Exception as e:
self.cursor, self.conn, self.error = self.get_pa_cursor(host=self.cs.host, port=self.cs.port,
user=self.cs.user, password=self.cs.password,
dfile=self.cs.dfile)
time.sleep(1)
return ['','{}: {}'.format('CRITICAL ERROR', self.error)]
def get_commands(self, sort='Command ASC'):
"Generates the command counter stats"
try:
if self.error is None:
self.cursor.execute('SELECT * FROM stats_mysql_commands_counters WHERE Total_Time_us > 0;')
data = self.cursor.fetchall()
list_data = []
title = self.cursor.description
col_sizes = self.get_col_sizes(title, data)
if len(data) > 0:
for row in data:
str_row = '{:<{width}}'.format(row[0],width=col_sizes[0])
str_row = '{} Time_us: {:<{width}}'.format(str_row, row[1],width=col_sizes[1])
str_row = '{} Total_count: {:<{width}}'.format(str_row, row[2],width=col_sizes[2])
list_data.append(str_row)
else:
list_data.append("No commands have been executed yet...")
return list_data
else:
raise Exception('Hmm... connection seems to have been lost')
except Exception as e:
self.cursor, self.conn, self.error = self.get_pa_cursor(host=self.cs.host, port=self.cs.port,
user=self.cs.user, password=self.cs.password,
dfile=self.cs.dfile)
time.sleep(1)
return ['','{}: {}'.format('CRITICAL ERROR', self.error)]
def get_proclist(self):
"Generates the process list information"
try:
if self.error is None:
self.cursor.execute('SELECT * FROM stats.stats_mysql_processlist WHERE command != "Sleep"')
data = self.cursor.fetchall()
list_data = ['']
title = self.cursor.description
col_sizes = self.get_col_sizes(title, data)
if len(data) > 0:
for row in data:
str_row = 'th-{:<{width}}'.format(row[0],width=col_sizes[0])
str_row = '{} hg-{:<{width}}'.format(str_row, row[6],width=col_sizes[6])
str_row = '{} {:<{width}}'.format(str_row, row[9],width=col_sizes[9])
str_row = '{} {:<{width}}'.format(str_row, row[11],width=col_sizes[11])
str_row = '{} {:<{width}}'.format(str_row, row[13],width=col_sizes[13])
list_data.append(str_row)
else:
self.cursor.execute('SELECT count(*) FROM stats.stats_mysql_processlist WHERE command == "Sleep"')
data = self.cursor.fetchone()
list_data.append("No processes running currently... {} sleeping".format(data[0]))
return list_data
else:
raise Exception('Hmm... connection seems to have been lost')
except Exception as e:
self.cursor, self.conn, self.error = self.get_pa_cursor(host=self.cs.host, port=self.cs.port,
user=self.cs.user, password=self.cs.password,
dfile=self.cs.dfile)
time.sleep(1)
return ['','{}: {}'.format('CRITICAL ERROR', self.error)]
def get_querystats(self, sort='rule_id ASC'):
"Generates the query rules data"
try:
if self.error is None:
self.cursor.execute('SELECT * FROM stats.stats_mysql_query_rules ORDER BY {}'.format(sort))
data = self.cursor.fetchall()
list_data = []
title = self.cursor.description
col_sizes = self.get_col_sizes(title, data)
for row in data:
str_row = 'rule_id: {:<{width}}'.format(row[0],width=col_sizes[0])
str_row = '{} hits: {:<{width}}'.format(str_row, row[1],width=col_sizes[1])
list_data.append(str_row)
return list_data
else:
raise Exception('Hmm... connection seems to have been lost')
except Exception as e:
self.cursor, self.conn, self.error = self.get_pa_cursor(host=self.cs.host, port=self.cs.port,
user=self.cs.user, password=self.cs.password,
dfile=self.cs.dfile)
time.sleep(1)
return ['','{}: {}'.format('CRITICAL ERROR', self.error)]
def get_rule_data(self, rule_id):
"Generates detailed query rule information"
try:
if self.error is None:
self.cursor.execute('SELECT * FROM runtime_mysql_query_rules WHERE rule_id = {}'.format(rule_id))
rule_data = self.cursor.fetchone()
fmt_rule_data = ''
for cnt, rvalue in enumerate(rule_data):
if rvalue is not None:
fmt_rule_data = '\n'.join([fmt_rule_data,'{:>21}: {}'.format(self.cursor.description[cnt][0], rvalue)])
return fmt_rule_data
else:
raise Exception('Hmm... connection seems to have been lost')
except Exception as e:
self.cursor, self.conn, self.error = self.get_pa_cursor(host=self.cs.host, port=self.cs.port,
user=self.cs.user, password=self.cs.password,
dfile=self.cs.dfile)
time.sleep(1)
return '- N/A'
def get_command_data(self, command):
"Generates detailed command counter information"
try:
if self.error is None:
self.cursor.execute('SELECT * FROM stats.stats_mysql_commands_counters WHERE Command = "{}"'.format(command))
command_data = self.cursor.fetchone()
fmt_command_data = ''
previous_interval = ''
for cnt, rvalue in enumerate(command_data):
if rvalue is not None:
fmt_command_data = '\n'.join([fmt_command_data,'{:>13}: {}' \
.format(self.cursor.description[cnt][0].replace('cnt_','{} - '.format(previous_interval)), rvalue)])
if previous_interval == 'Total_Time_us':
previous_interval = '0'
else:
previous_interval = self.cursor.description[cnt][0].replace('cnt_','')
return fmt_command_data
else:
raise Exception('Hmm... connection seems to have been lost')
except Exception as e:
self.cursor, self.conn, self.error = self.get_pa_cursor(host=self.cs.host, port=self.cs.port,
user=self.cs.user, password=self.cs.password,
dfile=self.cs.dfile)
time.sleep(1)
return '- N/A'
### Form sub-classes
class ConnPoolForm(npyscreen.FormBaseNew):
def set_up_handlers(self):
super(ConnPoolForm, self).set_up_handlers()
self.handlers.update({"s": self.resort,
"+": self.interval_inc,
"-": self.interval_dec})
def interval_inc(self, input):
self.keypress_timeout = self.keypress_timeout + 10
def interval_dec(self, input):
if self.keypress_timeout > 10:
self.keypress_timeout = self.keypress_timeout - 10
def resort(self, input):
if self.sort == 'ConnOK DESC':
self.sort = 'ConnUsed DESC'
elif self.sort == 'ConnUsed DESC':
self.sort = 'ConnFree DESC'
elif self.sort == 'ConnFree DESC':
self.sort = 'ConnErr DESC'
elif self.sort == 'ConnErr DESC':
self.sort = 'Latency_us DESC'
else:
self.sort = 'ConnOK DESC'
self.cpt.value = self.parentApp.pstats.get_cpheader()
self.cpw.values = self.parentApp.pstats.get_cpstats(self.sort)
npyscreen.notify_confirm("Now ordered by - {}".format(self.sort), title="Sort order updated:",
form_color='STANDOUT', wrap=True, wide=False, editw=0)
self.display()
def while_waiting(self):
self.cpt.value = self.parentApp.pstats.get_cpheader()
self.cpw.values = self.parentApp.pstats.get_cpstats(self.sort)
self.display()
def create(self):
self.sort = 'ConnOK DESC'
self.cpt = self.add(npyscreen.TitleFixedText, name='Connection Pool Statistics:',
value=self.parentApp.pstats.get_cpheader(), editable=False,
use_two_lines=False,max_width=0, max_height=1)
self.cpw = self.add(npyscreen.Pager, name='Connection Pool Statistics:',
values=self.parentApp.pstats.get_cpstats(self.sort),
max_width=0, max_height=-3)
self.connButton = self.add(ConnsButton, name='ConnPool', rely=-3, max_height=1)
self.queryRulesButton = self.add(QueryRulesButton, name='QueryRules', rely=-3, relx=12)
self.globButton = self.add(GlobButton, name='GloStat', rely=-3, relx=24)
self.procButton = self.add(ProcButton, name='ProcList', rely=-3, relx=33)
self.comButton = self.add(ComButton, name="ComCount", rely=-3, relx=43)
self.exitButton = self.add(ExitButton, name="Exit", rely=-3, relx=53)
class GlobForm(npyscreen.FormBaseNew):
def set_up_handlers(self):
super(GlobForm, self).set_up_handlers()
self.handlers.update({"+": self.interval_inc,
"-": self.interval_dec})
def interval_inc(self, input):
self.keypress_timeout = self.keypress_timeout + 10
def interval_dec(self, input):
if self.keypress_timeout > 10:
self.keypress_timeout = self.keypress_timeout - 10
def while_waiting(self):
self.cpw.values = self.parentApp.pstats.get_globstats()
self.cpw.display()
def create(self):
self.cpw = self.add(npyscreen.TitlePager, name='Global Statistics:', begin_entry_at=0,
use_two_lines=True, values=self.parentApp.pstats.get_globstats(),
max_width=0, max_height=-1)
self.connButton = self.add(ConnsButton, name='ConnPool', rely=-3, max_height=1)
self.queryRulesButton = self.add(QueryRulesButton, name='QueryRules', rely=-3, relx=12)
self.globButton = self.add(GlobButton, name='GloStat', rely=-3, relx=24)
self.procButton = self.add(ProcButton, name='ProcList', rely=-3, relx=33)
self.comButton = self.add(ComButton, name="ComCount", rely=-3, relx=43)
self.exitButton = self.add(ExitButton, name="Exit", rely=-3, relx=53)
class ProcForm(npyscreen.FormBaseNew):
def set_up_handlers(self):
super(ProcForm, self).set_up_handlers()
self.handlers.update({"+": self.interval_inc,
"-": self.interval_dec})
def interval_inc(self, input):
self.keypress_timeout = self.keypress_timeout + 10
def interval_dec(self, input):
if self.keypress_timeout > 50:
self.keypress_timeout = self.keypress_timeout - 10
def while_waiting(self):
self.keypress_timeout = 50
self.cpw.values = self.parentApp.pstats.get_proclist()
self.cpw.display()
def create(self):
self.cpw = self.add(npyscreen.TitlePager, name='Active Processlist (Minimum interval 5s):', begin_entry_at=0,
use_two_lines=True, values=self.parentApp.pstats.get_proclist(),
max_width=0, max_height=-2)
self.connButton = self.add(ConnsButton, name='ConnPool', rely=-3, max_height=1)
self.queryRulesButton = self.add(QueryRulesButton, name='QueryRules', rely=-3, relx=12)
self.globButton = self.add(GlobButton, name='GloStat', rely=-3, relx=24)
self.procButton = self.add(ProcButton, name='ProcList', rely=-3, relx=33)
self.comButton = self.add(ComButton, name="ComCount", rely=-3, relx=43)
self.exitButton = self.add(ExitButton, name="Exit", rely=-3, relx=53)
class QueryRulesForm(npyscreen.FormBaseNew):
def set_up_handlers(self):
super(QueryRulesForm, self).set_up_handlers()
self.handlers.update({"s": self.resort,
"+": self.interval_inc,
"-": self.interval_dec})
def interval_inc(self, input):
self.keypress_timeout = self.keypress_timeout + 10
def interval_dec(self, input):
if self.keypress_timeout > 10:
self.keypress_timeout = self.keypress_timeout - 10
def resort(self, input):
if self.sort == 'rule_id ASC':
self.sort = 'hits DESC'
else:
self.sort = 'rule_id ASC'
self.cpw.values = self.parentApp.pstats.get_querystats(self.sort)
npyscreen.notify_confirm("Now ordered by - {}".format(self.sort), title="Sort order updated:",
form_color='STANDOUT', wrap=True, wide=False, editw=0)
self.cpw.display()
def while_waiting(self):
self.cpw.values = self.parentApp.pstats.get_querystats(self.sort)
self.cpw.display()
def create(self):
self.sort = 'rule_id ASC'
self.cpw = self.add(ShowQueryInfo, name='Query Rules Stats:', begin_entry_at=0,
use_two_lines=True, values=self.parentApp.pstats.get_querystats(self.sort),
max_width=0, max_height=-1)
self.connButton = self.add(ConnsButton, name='ConnPool', rely=-3, max_height=1)
self.queryRulesButton = self.add(QueryRulesButton, name='QueryRules', rely=-3, relx=12)
self.globButton = self.add(GlobButton, name='GloStat', rely=-3, relx=24)
self.procButton = self.add(ProcButton, name='ProcList', rely=-3, relx=33)
self.comButton = self.add(ComButton, name="ComCount", rely=-3, relx=43)
self.exitButton = self.add(ExitButton, name="Exit", rely=-3, relx=53)
class ComForm(npyscreen.FormBaseNew):
def set_up_handlers(self):
super(ComForm, self).set_up_handlers()
self.handlers.update({"+": self.interval_inc,
"-": self.interval_dec})
def interval_inc(self, input):
self.keypress_timeout = self.keypress_timeout + 10
def interval_dec(self, input):
if self.keypress_timeout > 10:
self.keypress_timeout = self.keypress_timeout - 10
def while_waiting(self):
self.cpw.values = self.parentApp.pstats.get_commands()
self.cpw.display()
def create(self):
self.cpw = self.add(ShowCommandInfo, name='Command Counters:', begin_entry_at=0,
use_two_lines=True, values=self.parentApp.pstats.get_commands(),
max_width=0, max_height=-2)
self.connButton = self.add(ConnsButton, name='ConnPool', rely=-3, max_height=1)
self.queryRulesButton = self.add(QueryRulesButton, name='QueryRules', rely=-3, relx=12)
self.globButton = self.add(GlobButton, name='GloStat', rely=-3, relx=24)
self.procButton = self.add(ProcButton, name='ProcList', rely=-3, relx=33)
self.comButton = self.add(ComButton, name="ComCount", rely=-3, relx=43)
self.exitButton = self.add(ExitButton, name="Exit", rely=-3, relx=53)
### Popup window sub-classes
class ShowCommandInfo(npyscreen.MultiLineAction):
def __init__(self, *args, **keywords):
super(ShowCommandInfo, self).__init__(*args, **keywords)
def actionHighlighted(self, act_on_this, keypress):
npyscreen.notify_confirm(self.parent.parentApp.pstats.get_command_data(act_on_this.split()[0]),
title="Execution Time Distribution:")
class ShowQueryInfo(npyscreen.MultiLineAction):
def __init__(self, *args, **keywords):
super(ShowQueryInfo, self).__init__(*args, **keywords)
def actionHighlighted(self, act_on_this, keypress):
npyscreen.notify_confirm(self.parent.parentApp.pstats.get_rule_data(act_on_this.split(':')[1].split()[0]),
title="Query Rule Info:")
### Button sub-classes
class ComButton(npyscreen.ButtonPress):
def whenPressed(self):
self.parent.parentApp.switchForm("COMC")
class QueryRulesButton(npyscreen.ButtonPress):
def whenPressed(self):
self.parent.parentApp.switchForm("QSTAT")
class ProcButton(npyscreen.ButtonPress):
def whenPressed(self):
self.parent.parentApp.switchForm("PROC")
class GlobButton(npyscreen.ButtonPress):
def whenPressed(self):
self.parent.parentApp.switchForm("GLOB")
class ConnsButton(npyscreen.ButtonPress):
def whenPressed(self):
self.parent.parentApp.switchForm("MAIN")
class ExitButton(npyscreen.ButtonPress):
def whenPressed(self):
sys.exit(0)
### Main application
class MainApp(npyscreen.NPSAppManaged):
keypress_timeout_default = 10
def onStart(self):
DB_CS = get_params()
title_bar = 'ProxyTop - Realtime ProxySQL Statistics'
self.pstats = ProxysqlStats(DB_CS)
self.addForm("MAIN", ConnPoolForm, name=title_bar)
self.addForm("QSTAT", QueryRulesForm, name=title_bar)
self.addForm("GLOB", GlobForm, name=title_bar)
self.addForm("PROC", ProcForm, name=title_bar)
self.addForm("COMC", ComForm, name=title_bar)
if __name__ == '__main__':
App = MainApp()
App.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment