Skip to content

Instantly share code, notes, and snippets.

@rslinckx
Created October 14, 2010 21:27
Show Gist options
  • Save rslinckx/627092 to your computer and use it in GitHub Desktop.
Save rslinckx/627092 to your computer and use it in GitHub Desktop.
SQLAlchemy Query debug helper for Flask and Flask-SQLAlchemy
import logging
log = logging.getLogger(__name__)
from werkzeug import Response, Template
import re
import cgi
import time
import uuid
from flask import request, Response, abort
from flaskext.sqlalchemy import get_debug_queries
try:
import sqlparse
except ImportError:
log.debug('Not using "sqlparse" for sql formatting')
sqlparse = None
try:
import pygments
from pygments import highlight as pygments_highlight
from pygments.lexers import SqlLexer
from pygments.formatters import HtmlFormatter
except ImportError:
log.debug('Not using "pygments" for sql highlighting')
pygments = False
BIND_MARKER_RE = re.compile(r'\?|%s|%\([^)]+\)s')
def counter():
n = 0
while True:
yield n
n += 1
def mangle_sql(sql, params):
n = counter()
def replace_marker(marker):
ix = n.next()
try:
return unicode(params[ix])
except (IndexError, KeyError):
return unicode(params['param_%d' % (ix+1)])
return BIND_MARKER_RE.sub(replace_marker, sql)
def highlight(sql, params, mangle=True):
formatted_sql = sql
if mangle:
formatted_sql = mangle_sql(sql, params)
if sqlparse:
formatted_sql = sqlparse.format(formatted_sql, reindent=True)
if pygments:
formatted_sql = pygments_highlight(formatted_sql, SqlLexer(), HtmlFormatter(noclasses=True))
else:
formatted_sql = '<div><pre>%s</pre></div>' % cgi.escape(formatted_sql)
return formatted_sql
QUERY_TPL = Template('''\
<!DOCTYPE html>
<html>
<head>
<style type="text/css">
h2 {}
#params, #sql, #execute, #explain table {
width: 100%;
border: 1px solid grey;
margin: 0;
padding: 0;
}
pre {
margin: 0;
padding: 5px;
}
table {
border-collapse: collapse
}
td, th {
padding: 0 0 0 5px;
text-align: left;
margin: 0;
}
th {
border: 1px solid grey;
}
</style>
<title>Query Stats: ${query.uid}</title>
</head>
<body>
<h1>Query Stats: ${query.uid}</h1>
<h2>Params</h2>
<div id="params">
<pre>${repr(query.parameters)}</pre>
</div>
<h2>SQL</h2>
<div id="sql">
${query.highlighted_statement}
</div>
<h2>Execution</h2>
<div id="execute">
${execute[0]} ms, ${execute[1]} results
</div>
<h2>Explain</h2>
<div id="explain">
<table>
<tr>
<% for k in explain[0] %>
<th>${k}</th>
<% endfor %>
</tr>
<% for row in explain[1] %>
<tr>
<% for v in row %>
<td><pre>${v}</pre></td>
<% endfor %>
</tr>
<% endfor %>
</table>
<% if is_pg and explain %>
<form method="post" target="_blank" action="http://explain.depesz.com/new">
<textarea name="explain" style="display: none;">
<% for row in explain[1] %>${row[0]}
<% endfor %>
</textarea>
<input type="submit" value="depesz pretty-print" name="submit">
</form>
<% endif %>
</div>
</body>
</html>
''')
from flask import url_for
class QueryWrapper(object):
def __init__(self, q, engine):
self.uid = uuid.uuid4().hex
self.q = q
self.engine = engine
self.href = url_for('.__sadebug_query', uid=self.uid, _external=True)
self.real_statement = None
def __getattr__(self, name):
return getattr(self.q, name)
def explain(self):
if not self.statement.upper().startswith('SELECT'):
return None
if self.engine.dialect.name == 'sqlite':
result = self.engine.execute("EXPLAIN QUERY PLAN %s" % self.statement, self.parameters)
else:
result = self.engine.execute("EXPLAIN ANALYZE %s" % self.statement, self.parameters)
explain = result.keys(), result.fetchall()
result.close()
return explain
def execute(self):
if not self.statement.upper().startswith('SELECT'):
return None
a = time.time()
result = self.engine.execute(self.statement, self.parameters)
fetch = result.fetchall()
b = time.time()
result.close()
self.real_statement = getattr(result, 'query', None)
return (b-a)*1000, len(fetch)
@property
def highlighted_statement(self):
return highlight(self.real_statement or self.statement, self.parameters, mangle=self.real_statement is None)
def render(self):
return QUERY_TPL.render(
query=self,
explain = self.explain(),
execute = self.execute(),
is_pg = self.engine.dialect.name.startswith('postgres'),
)
def __repr__(self):
return 'Query(%s, %r)' % (self.href, self.statement[:30] + '[...]' + self.statement[-30:])
class SaDebug(object):
def __init__(self):
self.queries = {}
def init_app(self, app, db):
self.app = app
self.db = db
if self.app.debug:
if app.config.get('SADEBUG_REPORT_QUERIES', False):
self.app.after_request(self.report_queries)
self.app.context_processor(self.inject_debug_queries)
self.app.add_url_rule('/__db/<uid>', endpoint='__sadebug_query', view_func=self.render_query)
def inject_debug_queries(self):
return dict(get_debug_queries=self.get_debug_queries)
def report_queries(self, response):
total = 0
buf = ['SaDebug report:']
for i, q in enumerate(self.get_debug_queries()):
buf.append(' %02d || %s[...]%s || %r || %dms' % (i, q.statement.replace('\n', '')[:50], q.statement.replace('\n', '')[-70:], q.parameters, q.duration*1000))
buf.append(' -> %s' % q.href)
total += q.duration
buf.append(' ----> Total: %dms' % (total * 1000))
log.info('\n'.join(buf))
return response
def get_debug_queries(self):
for q in sorted(get_debug_queries(), key=lambda x: x.start_time):
q = QueryWrapper(q, self.db.engine)
self.queries[q.uid] = q
yield q
def render_query(self, uid):
query = self.queries.get(uid)
if not query:
abort(404)
return Response(query.render(), mimetype='text/html')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment