Skip to content

Instantly share code, notes, and snippets.

@graffic
Last active August 19, 2019 12:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save graffic/4cfaaca7bbf0f2ba71a8fdf9b9642766 to your computer and use it in GitHub Desktop.
Save graffic/4cfaaca7bbf0f2ba71a8fdf9b9642766 to your computer and use it in GitHub Desktop.
El-cheapo mssql slow query monitor
#!/usr/bin/env python
from datetime import datetime
from functools import wraps
from hashlib import sha1
from time import sleep
import os
import pymssql
import click
from sty import fg, rs
STATS_QUERY_PLAN = """
SELECT
sqltext.TEXT,
query_plan,
granted_query_memory,
req.transaction_isolation_level,
req.total_elapsed_time,
req.cpu_time,
req.reads,
req.writes,
wait_time,
start_time,
c.client_net_address,
s.host_name,
s.program_name,
s.client_interface_name,
s.login_name
FROM sys.dm_exec_requests req
LEFT JOIN sys.dm_exec_connections c
ON req.session_id = c.session_id
LEFT JOIN sys.dm_exec_sessions s ON s.session_id=c.session_id
CROSS APPLY sys.dm_exec_sql_text(sql_handle) AS sqltext
CROSS APPLY sys.dm_exec_text_query_plan(plan_handle,0,-1)
WHERE req.total_elapsed_time > %d
"""
HUMAN_SKIP_COLUMNS = set(['query_plan'])
def prime(coroutine):
@wraps(coroutine)
def primer(*args, **kwargs):
res = coroutine(*args, **kwargs)
res.send(None)
return res
return primer
def new_connection_from_env():
host = os.getenv('MSSQL_HOST')
database = os.getenv('MSSQL_DATABASE')
print(f'Connecting to {host}/{database}')
return pymssql.connect(host=host,
user=os.getenv('MSSQL_USERNAME'),
password=os.getenv('MSSQL_PASSWORD'),
database=database)
def get_stats(duration):
print(f'Minimum delay of {duration}ms')
with new_connection_from_env() as conn:
while True:
with conn.cursor(as_dict=True) as cursor:
cursor.execute(STATS_QUERY_PLAN, (duration,))
yield cursor.fetchall()
@prime
def stdout_printer():
while True:
stats = yield
print(f'\n{fg.green}{datetime.now()}{rs.fg}')
counter = 0
for row in stats:
print(f'{fg.red}Entry {counter}')
counter += 1
for key, value in skip_columns(row.items()):
print(f'{fg.li_blue}{key}:{rs.fg} {value}')
def skip_columns(items):
"Skip human non readable columns"
for key, value in items:
if key in HUMAN_SKIP_COLUMNS:
continue
yield key, value
def write_plan_file(row, base_filename):
with open(f'{base_filename}.sqlplan', 'w') as output:
output.write(row['query_plan'])
def write_metadata_file(row, base_filename):
with open(f'{base_filename}.txt', 'w') as output:
for key, value in skip_columns(row.items()):
output.write(f'{key}: {value}\n')
def stat_key(stat):
return sha1(f'{stat["TEXT"]}-{stat["start_time"]}'.encode('utf-8')).hexdigest()
def update_running_queries(previous, new_stats):
current = {stat_key(s):s for s in new_stats}
finished = [previous[k] for k in previous.keys() if k not in current]
return finished, current
@prime
def log_sqlplan(dirname):
print(f'Slow queries output to: {dirname}')
if not os.path.exists(dirname):
os.makedirs(dirname)
running = {}
while True:
stats = yield
stats_to_log, running = update_running_queries(running, stats)
iso_date = datetime.now().strftime("%Y%m%dT%H%M%S")
for row in stats_to_log:
base_filename = os.path.join(dirname, f'{iso_date}_{row["total_elapsed_time"]}')
write_plan_file(row, base_filename)
write_metadata_file(row, base_filename)
@click.command()
@click.option('--duration', type=int, default=5000, help='Log queries that last longer than x ms')
@click.option('--frequency', type=int, default=3, help='Poll stats every x seconds')
@click.argument('log_dir')
def main(duration, log_dir, frequency):
"LOG_DIR Where to put the logged queries"
log = log_sqlplan(log_dir)
out = stdout_printer()
for stats in get_stats(duration):
out.send(stats)
log.send(stats)
sleep(frequency)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment