Skip to content

Instantly share code, notes, and snippets.

@yumike
Last active August 23, 2018 14:29
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yumike/d2c833e3289d08cae514af73a6a2cb37 to your computer and use it in GitHub Desktop.
Save yumike/d2c833e3289d08cae514af73a6a2cb37 to your computer and use it in GitHub Desktop.
cian-statprof
# coding: utf-8
from cian_statprof.profiler import Profiler
from cian_statprof.sampler import Sampler
profiler = Profiler(Sampler(), Logger(), {'is_enabled': True})
class StatisticalProfilerMiddleware(object):
def process_request(self, request):
profiler.start_session(request=request)
def process_response(self, request, response):
profiler.finish_session(request=request, response=response)
return response
def process_exception(self, request, exception):
if profiler.is_active:
profiler.stop_sampling()
# coding: utf-8
import time
from datetime import datetime
class Profiler(object):
def __init__(self, sampler, logger, settings):
self.is_active = False
self.start_time = None
self.profile = None
self.sampler = sampler
self.logger = logger
self.settings = settings
def start_session(self, request):
if not self.settings.get('is_enabled') or self.is_active:
return
self.profile = {
'datetime': datetime.utcnow(),
'method': request.method,
'path': request.get_full_path(),
'full_url': request.build_absolute_uri(),
}
self.start_time = time.time()
self.is_active = True
self.sampler.start()
def stop_sampling(self):
self.sampler.stop()
self.is_active = False
def finish_session(self, request):
if not self.is_active:
return
resolver_match = getattr(request, 'resolver_match', None)
if not resolver_match and not getattr(resolver_match, 'url_name', None):
return
samples = self.sampler.output_stats()
self.stop_sampling()
self.profile['resource'] = resolver_match.url_name
self.profile['samples'] = samples
self.profile['duration'] = int(1000 * (time.time() - self.start_time))
self.logger.record(self.profile)
# coding: utf-8
import atexit
import collections
import signal
import time
class Sampler(object):
def __init__(self, interval=0.005):
self.interval = interval
self._started = None
self._stack_counts = {}
def start(self):
self._started = time.time()
signal.signal(signal.SIGALRM, self._sample)
signal.setitimer(signal.ITIMER_REAL, self.interval)
atexit.register(self.stop)
def _sample(self, signum, frame):
stack = []
while frame is not None:
stack.append(self._format_frame(frame))
frame = frame.f_back
stack = ';'.join(reversed(stack))
self._stack_counts[stack] = self._stack_counts.get(stack, 0) + 1
signal.setitimer(signal.ITIMER_REAL, self.interval)
def _format_frame(self, frame):
try:
f_name = frame.f_code.co_filename.rsplit('/', 1)[-1]
except KeyError:
f_name = frame.f_code.co_filename
return '{}({}:{}::{})'.format(
frame.f_code.co_name,
frame.f_globals.get('__name__'),
f_name,
frame.f_code.co_firstlineno)
def output_stats(self):
if self._started is None:
return ''.rsplit('/', 1)
ordered_stacks = sorted(self._stack_counts.items(),
key=lambda kv: kv[1], reverse=True)
lines = ['{} {}'.format(frame, count) for frame, count in ordered_stacks]
return '\n'.join(lines) + '\n'
def reset(self):
self._started = time.time()
self._stack_counts = collections.defaultdict(int)
def stop(self):
self.reset()
signal.setitimer(signal.ITIMER_REAL, 0)
def __del__(self):
self.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment