Skip to content

Instantly share code, notes, and snippets.

@m0n5t3r
Created November 3, 2010 17:15
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save m0n5t3r/661377 to your computer and use it in GitHub Desktop.
Save m0n5t3r/661377 to your computer and use it in GitHub Desktop.
stats gathering with gunicorn; gstats.py should be run with one sync worker, gunicorn.conf.py should be in your config

Stats gathering thing for gunicorn

... or, really, anything that supports custom pre_request and post_request hooks

This is a simple app able to gather stats from gunicorn (or similar) and return them as a JSON object; munin plug-ins to graph requests/second and average request time are also included. Request times are calculated over the last 10 minutes by default, but it's configurable (see below).

Requirements

  • you should be able to speak Python (for the hooks)
  • gunicorn
  • pyzmq (PPA for Ubuntu)
  • [optional] python-setproctitle for nice process names (PPA for ubuntu)

Usage

./gstats-collectd -h
Usage: gstats-collectd [options]

Options:
  -h, --help            show this help message and exit
  -s ADDR, --stats-address=ADDR
						set collector address to ADDR [tcp://127.0.0.2:2345]
  -c ADDR, --comm-address=ADDR
						set communication address to ADDR
						[tcp://127.0.0.1:2345]
  -l LENGTH, --buffer-length=LENGTH
						compute average load times over the last LENGTH
						seconds [600]

for the format of ZeroMQ addresses, please refer to zmq_connect(3) (online at
http://api.zeromq.org/zmq_connect.html)

Use the included gunicorn.conf.py as an example for the [pre/post]_request hooks. The supplied munin plugins take an environment variable called status_addr, which defaults to tcp://127.0.0.1:2345 (the default communications address for collectd)

The gstats.py contains a web interface that fetches the stats from gstats-collectd and returns a JSON object; to use

  • start gstats-collectd
  • start gstats (edit to change the collector comms address if you need): gunicorn -b 127.0.0.1:8002 -w 1 gstats:app

The results should be available at http://localhost:8002/_status

Some things to be aware of:

  • Only static workers are tested for gstats
  • Due to a bug in pyzmq, python will crash if you try to have more than one zmq.Context active in the same interpreter. This means that you cannot use the supplied gunicorn.conf.py to run gstats, only use it to run your actual apps
#!/usr/bin/env python
import sys
from random import random
from time import sleep
from threading import Thread, active_count
execfile('gunicorn.conf.py')
class Worker(Thread):
def __init__(self, nreq):
super(Worker, self).__init__()
self.nreq = nreq
def run(self):
"""
generate <nreq> requests taking a random amount of time between 0 and 0.5 seconds
"""
for i in xrange(self.nreq):
req = '%s_%s' % (self.ident, i)
pre_request(None, req)
sleep(random() / 2)
post_request(None, req)
if __name__ == '__main__':
# simulate workload: <sys.argv[1]> workers serving <sys.argv[2]> requests each
workers = []
nw = int(sys.argv[1])
nr = int(sys.argv[2])
for n in range(nw):
t = Worker(nr)
t.start()
workers.append(t)
print '%s started' % t.name
while active_count() > 1:
for t in workers:
if t.is_alive():
t.join(0.1)
if not t.is_alive():
print '%s finished' % t.name
#!/usr/bin/env python
#
# Copyright (c) 2010 Sabin Iacob <iacobs@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import zmq
import signal
from math import sqrt
from collections import defaultdict, deque
from time import time
from optparse import OptionParser
class StopThread(Exception):
pass
class TimeRingBuffer(object):
"""
Timed ring buffer: holds onjects passed in the last <interval> seconds
"""
def __init__(self, interval):
"""
:param interval: ring buffer's dimension (seconds)
:type interval: int
"""
self.__size = interval
self.__things = deque()
self.__count = 0L
@property
def _current_timestamp(self):
return int(time())
@property
def values(self):
return [t[1] for t in self.__things]
@property
def count(self):
return self.__count
def __len__(self):
return len(self.__things)
def append(self, val):
ts = self._current_timestamp
oldest = ts - self.__size
while self.__things and self.__things[0][0] < oldest:
self.__things.popleft()
self.__things.append((ts, val))
self.__count += 1
class StatsCollector(object):
def __init__(self, zmq_context, collector_address='tcp://127.0.0.2:2345', comm_address='tcp://127.0.0.1:2345', buffer_length=600):
self.ctx = zmq_context
self.collector_address = collector_address
self.comm_address = comm_address
self.buffer_length = buffer_length
self.reset_stats()
def reset_stats(self):
self.stats = defaultdict(lambda: {
'started': TimeRingBuffer(self.buffer_length),
'finished': TimeRingBuffer(self.buffer_length)
})
def collect_stats(self, prefix='default', req_time=0):
stats = self.stats[prefix]
if not req_time:
stats['started'].append(0)
else:
stats['finished'].append(req_time)
def assemble_stats(self):
ret = {}
for prefix, data in self.stats.items():
finished = data['finished'].values
finished_cnt = len(finished)
if finished_cnt < 1:
finished_cnt = 1
time_total = sum(finished)
time_avg = time_total / float(finished_cnt)
ret[prefix] = {
'started': data['started'].count,
'finished': data['finished'].count,
'processing_time': {
'avg': time_avg,
'std': sqrt(sum(((t - time_avg) ** 2 for t in finished)) / finished_cnt)
}
}
return ret
def die(self, *args):
raise StopThread()
def run(self):
collector = self.ctx.socket(zmq.REP)
comm = self.ctx.socket(zmq.PAIR)
sig = self.ctx.socket(zmq.PAIR)
collector.bind(self.collector_address)
comm.bind(self.comm_address)
sig.bind('inproc://signals')
def on_collector():
prefix, req_time = collector.recv_multipart()
prefix = prefix or 'default'
req_time = req_time and float(req_time) or 0
self.collect_stats(prefix, req_time)
collector.send('OK')
def on_comm():
cmd = comm.recv()
if cmd not in commands:
comm.send('ERROR')
return
ret = commands[cmd]()
comm.send_json(ret)
def on_sig():
signum = int(sig.recv())
if sig not in signals:
return
signals[signum]()
commands = {
'GET': self.assemble_stats,
}
signals = {
signal.SIGQUIT: self.die,
signal.SIGTERM: self.die,
signal.SIGUSR1: self.reset_stats,
}
read_handlers = {
collector: on_collector,
comm: on_comm,
sig: on_sig,
}
try:
while True:
r,w,x = zmq.select([collector, comm, sig], [collector, comm], [])
for s in r:
read_handlers[s]()
except StopThread:
pass
def stop_collector(signum, frame):
sig = get_context().socket(zmq.PAIR)
sig.connect('inproc://signals')
sig.send(str(signum))
def context_factory():
context_store = []
def inner():
if not context_store:
context_store.append(zmq.Context())
return context_store[0]
return inner
get_context = context_factory()
if __name__ == '__main__':
# TODO find something that actually works here without waiting for zmq.select
signal.signal(signal.SIGQUIT, stop_collector)
signal.signal(signal.SIGTERM, stop_collector)
epilog = 'for the format of ZeroMQ addresses, please refer to zmq_connect(3) (online at http://api.zeromq.org/zmq_connect.html)'
parser = OptionParser(epilog=epilog)
parser.add_option('-s', '--stats-address', dest='collector', default='tcp://127.0.0.2:2345', help='set collector address to ADDR [%default]', metavar='ADDR')
parser.add_option('-c', '--comm-address', dest='comm', default='tcp://127.0.0.1:2345', help='set communication address to ADDR [%default]', metavar='ADDR')
parser.add_option('-l', '--buffer-length', dest='buflen', type='int', default=600, help='compute average load times over the last LENGTH seconds [%default]', metavar='LENGTH')
o, a = parser.parse_args()
try:
import setproctitle
setproctitle.setproctitle('gstats [stats collector collector=%s, comms=%s, buflen=%s]' % (o.collector, o.comm, o.buflen))
except ImportError:
pass
stats_collector = StatsCollector(get_context(), collector_address=o.collector, comm_address=o.comm, buffer_length=o.buflen)
stats_collector.run()
# Copyright (c) 2010 Sabin Iacob <iacobs@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import zmq
class Application(object):
def __init__(self, zmq_context, collectd_address):
self.ctx = zmq_context
self.collectd_address = collectd_address
def dispatch(self, env):
""" very simple URL dispatch, a la Cake: /zelink maps to handle_zelink """
path = filter(None, env['PATH_INFO'].split('/'))
handler = getattr(self, 'handle_%s' % path[0], None)
if not handler:
return '404 Not Found', '%(PATH_INFO)s not found' % env
return handler(env)
def handle__status(self, env):
comm = self.ctx.socket(zmq.PAIR)
comm.connect(self.collectd_address)
comm.send('GET')
ret = comm.recv()
comm.close()
return '200 OK', [ret]
def __call__(self, env, start_response):
if env['REMOTE_ADDR'] != '127.0.0.1':
start_response('403 Forbidden', [])
return ['You are not allowed to see this!']
status, ret = self.dispatch(env)
start_response(status, [])
return ret
def context_factory():
context_store = []
def inner():
if not context_store:
context_store.append(zmq.Context())
return context_store[0]
return inner
get_context = context_factory()
app = Application(get_context(), 'tcp://127.0.0.1:2345')
#!/usr/bin/python
import sys
import os
import re
import zmq
SLUG_UNFRIENDLY = re.compile(r'[^0-9a-z_]')
ctx = zmq.Context()
def getServerStatus(munin_prefix):
status_addr = os.environ.get('status_addr', 'tcp://127.0.0.1:2345')
collector = ctx.socket(zmq.PAIR)
collector.connect(status_addr)
collector.send('GET')
ss = collector.recv_json()
for prefix, data in ss.items():
if re.sub(SLUG_UNFRIENDLY, '_', prefix.lower()) == munin_prefix:
return prefix, data['processing_time']
return None, None
def doData(munin_prefix):
prefix, ss = getServerStatus(munin_prefix)
print 'avg_low.value %s' % (float(ss['avg']) - float(ss['std']))
print 'avg_std.value %s' % ss['std']
print 'avg.value %s' % ss['avg']
def doConfig(munin_prefix):
prefix, ss = getServerStatus(munin_prefix)
print 'graph_title Gunicorn request time for %s' % prefix
print 'graph_args --base 1000 -l 0'
print 'graph_vlabel ms'
print 'graph_category gunicorn'
print 'avg_low.label low limit'
print 'avg_low.min 0'
print 'avg_low.type GAUGE'
print 'avg_low.draw AREA'
print 'avg_low.colour 00000000'
print 'avg_std.label standard deviation'
print 'avg_std.min 0'
print 'avg_std.type GAUGE'
print 'avg_std.draw STACK'
print 'avg.label average'
print 'avg.min 0'
print 'avg.type GAUGE'
print 'avg.draw LINE2'
if __name__ == '__main__':
munin_prefix = sys.argv[0].split('_', 1)[1]
if not munin_prefix:
print 'please link as %s<prefix>' % os.path.basename(sys.argv[0])
exit(1)
if len(sys.argv) > 1 and sys.argv[1] == 'config':
doConfig(munin_prefix)
else:
doData(munin_prefix)
#!/usr/bin/python
import sys
import os
import re
import zmq
SLUG_UNFRIENDLY = re.compile(r'[^0-9a-z_]')
ctx = zmq.Context()
def getServerStatus(munin_prefix):
status_addr = os.environ.get('status_addr', 'tcp://127.0.0.1:2345')
collector = ctx.socket(zmq.PAIR)
collector.connect(status_addr)
collector.send('GET')
ss = collector.recv_json()
for prefix, data in ss.items():
if re.sub(SLUG_UNFRIENDLY, '_', prefix.lower()) == munin_prefix:
return prefix, data
return None, None
def doData(munin_prefix):
prefix, ss = getServerStatus(munin_prefix)
print 'started.value %s' % ss['started']
print 'finished.value %s' % ss['finished']
def doConfig(munin_prefix):
prefix, ss = getServerStatus(munin_prefix)
print 'graph_title Gunicorn requests for %s' % prefix
print 'graph_args --base 1000 -l 0'
print 'graph_vlabel requests / ${graph_period}'
print 'graph_category gunicorn'
for k in ['started', 'finished']:
print '%s.label %s' % (k, k)
print '%s.min 0' % k
print '%s.type DERIVE' % k
if __name__ == '__main__':
munin_prefix = sys.argv[0].split('_', 1)[1]
if not munin_prefix:
print 'please link as %s<prefix>' % os.path.basename(sys.argv[0])
exit(1)
if len(sys.argv) > 1 and sys.argv[1] == 'config':
doConfig(munin_prefix)
else:
doData(munin_prefix)
# Copyright (c) 2010 Sabin Iacob <iacobs@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import zmq
from datetime import datetime
requests = {}
def context_factory():
context_store = []
def inner():
if not context_store:
context_store.append(zmq.Context())
return context_store[0]
return inner
get_context = context_factory()
collector_addr = 'tcp://127.0.0.2:2345'
def pre_request(worker, req):
_collector = get_context().socket(zmq.REQ)
_collector.connect(collector_addr)
_collector.send_multipart(['my_app', ''])
_collector.recv()
requests[hash(req)] = datetime.now()
def post_request(worker, req):
req_end = datetime.now()
req = hash(req)
if req in requests:
req_time = req_end - requests[req]
req_time = req_time.seconds * 1000 + req_time.microseconds / 1000
del requests[req]
_collector = get_context().socket(zmq.REQ)
_collector.connect(collector_addr)
_collector.send_multipart(['my_app', str(req_time)])
_collector.recv()
@ashokrkm
Copy link

ashokrkm commented Nov 9, 2010

is there a reason you use collector.bind('tcp://127.0.0.2:2345') instead of 127.0.0.1?

i am getting this error http://pastie.org/1285271

@ashokrkm
Copy link

ashokrkm commented Nov 9, 2010

also results are available at
/_status
not
/_stats

@m0n5t3r
Copy link
Author

m0n5t3r commented Nov 9, 2010

  • first question: no particular reason, just needed a loopback address; I made it a parameter for StatsCollector now, should be easy to change (as I said, I assume you speak Python); as for the error, it's because OSX is stupid, a quick Google search says you can solve it with
    sudo ifconfig lo0 alias 127.0.0.* up
  • second: typo, thanks for catching it :)

@ashokrkm
Copy link

ashokrkm commented Nov 9, 2010

great, thanks for the updates.

do you think it would be possible /useful to keep track of per worker statistics also?

the reason is i have 15 workers and suddenly one of them is using up 1.5GB RAM and others are just 100MB?

@m0n5t3r
Copy link
Author

m0n5t3r commented Nov 9, 2010

should be easy: send worker PID to gstats, for example, and maintain a dict of workers in gstats;

be aware that there is a possibility of a memory leak if your workers die often, unless you also set up a mechanism to signal that a worker has exited (there is a hook in gunicorn for that)

also, due to benoitc/gunicorn#119, you will probably get a memory leak if you use async workers in your app: sometimes pre_request runs and post_request doesn't, which means that the requests dict grows indefinitely; I had to patch gunicorn to be able to use this in production

time to sleep for me, it's 1 AM here :)

@ashokrkm
Copy link

ashokrkm commented Nov 9, 2010

OK thanks I will look into it.

with the code you have provided would it be possible for gunicorn apps running in different machines to connect to gstats.py app. i have 2 app servers each running gunicorns and i would like to have gstats.py running in only one machine if that is possible at all!

thanks for all the help

@m0n5t3r
Copy link
Author

m0n5t3r commented Nov 12, 2010

just change the IP:port used for the collector:
stats_collector = StatsCollector(get_context(), 'tcp://:')

and use the same IP:port in pre_request and post_request:
_collector.connect('tcp://:')

@m0n5t3r
Copy link
Author

m0n5t3r commented Jan 29, 2011

This gist now has a new home at https://github.com/m0n5t3r/gstats

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment