Skip to content

Instantly share code, notes, and snippets.

@danizen
Last active November 9, 2018 22:35
Show Gist options
  • Save danizen/d410c7ed2dc09be1b0e18dcd9626f4e1 to your computer and use it in GitHub Desktop.
Save danizen/d410c7ed2dc09be1b0e18dcd9626f4e1 to your computer and use it in GitHub Desktop.
A WSGI middleware to check for leaks
import tracemalloc
import pathlib
import random
import csv
import os
from datetime import datetime
import re
import linecache
import logging
import threading
import portalocker
from django.conf import settings
logger = logging.getLogger(__name__)
class LeakCheckingWSGI:
sample_rate = float(os.environ.get('LEAKCHECK_SAMPLE_RATE', 0.1))
top = int(os.environ.get('LEAKCHECK_TOP', 10))
filterpath = os.environ.get('LEAKCHECK_FILTERS', None)
def __init__(self, application):
self.application = application
if self.__class__.filterpath:
self.filterpath = pathlib.Path(self.__class__.filterpath)
else:
self.filterpath = pathlib.Path(settings.BASE_DIR).joinpath('leakcheck-filters.csv')
self.filename_expr = re.compile(r'lib[/\\]', re.IGNORECASE)
self.basedir_len_plus1 = len(settings.BASE_DIR)+1
self.lock = threading.Lock()
self.log = pathlib.Path(settings.LOG_DIR, 'leakcheck.txt')
@property
def filters(self):
if not hasattr(self, '_filters'):
filters = [
tracemalloc.Filter(False, '<frozen importlib._bootstrap>'),
tracemalloc.Filter(False, '<frozen importlib._bootstrap_external>'),
tracemalloc.Filter(False, __file__),
]
if self.filterpath.exists():
with self.filterpath.open('r', encoding='utf-8') as f:
reader = csv.reader(f, dialect='unix')
for row in reader:
if row == ['inclusive','pattern', 'lineno']:
continue
filters.append(tracemalloc.Filter(
row[0].lower() == 'true',
row[1],
row[2] if row[2] else None
))
self._filters = filters
return self._filters
def get_filename(self, frame):
m = self.filename_expr.search(frame.filename)
if m:
return frame.filename[m.end():]
elif frame.filename.startswith(settings.BASE_DIR):
return frame.filename[self.basedir_len_plus1:]
else:
return frame.filename
def report(self, method, path, stats):
timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
logger.info('%s %s', method, path)
with self.log.open('a', encoding='utf-8') as f:
portalocker.lock(f, portalocker.LOCK_EX)
f.write('{} {} {}\n'.format(
timestamp, method, path
))
for index, stat in enumerate(stats[:self.top], 1):
frame = stat.traceback[0]
filename = self.get_filename(frame)
f.write(' {}: {}:{}: {:.2f} KB new, {:.2f} KB total\n'.format(
index, filename, frame.lineno,
stat.size_diff/1024, stat.size/1024
))
line = linecache.getline(frame.filename, frame.lineno).strip()
if line:
prefix = ' ' if index < 10 else ' '
f.write(prefix+line+'\n')
other = stats[self.top:]
if other:
size_diff = sum(stat.size_diff for stat in other)
size = sum(stat.size for stat in other)
f.write(' {} other: {:.2f} KB new, {:.2f} KB total\n'.format(
len(other), size_diff, size))
f.write('\n')
def __call__(self, environ, start_response):
path = environ['PATH_INFO']
method = environ['REQUEST_METHOD']
if not path.startswith('/public') and random.uniform(0,1) < self.sample_rate:
tracemalloc.start()
snapshot_before = tracemalloc.take_snapshot().filter_traces(self.filters)
else:
snapshot_before = None
response = self.iterated_response(environ, start_response)
if snapshot_before:
snapshot_after = tracemalloc.take_snapshot().filter_traces(self.filters)
memdiff = snapshot_after.compare_to(snapshot_before, 'lineno')
tracemalloc.stop()
total_change = sum(s.size_diff for s in memdiff)
if total_change > 0:
with self.lock:
self.report(method, path, memdiff)
return response
def iterated_response(self, environ, start_response):
"""
This forces Django to not be lazy and causes the response to be a tuple of string
"""
return tuple(a for a in self.application(environ, start_response))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment