Skip to content

Instantly share code, notes, and snippets.

@deniszh
Last active November 11, 2015 17:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deniszh/ee569bf7948d71aead23 to your computer and use it in GitHub Desktop.
Save deniszh/ee569bf7948d71aead23 to your computer and use it in GitHub Desktop.
import os
import time
import socket
from resource import getrusage, RUSAGE_SELF
from twisted.application.service import Service
from twisted.internet.task import LoopingCall
from carbon.conf import settings
stats = {}
HOSTNAME = socket.gethostname().replace('.','_')
PAGESIZE = os.sysconf('SC_PAGESIZE')
rusage = getrusage(RUSAGE_SELF)
lastUsage = rusage.ru_utime + rusage.ru_stime
lastUsageTime = time.time()
# NOTE: Referencing settings in this *top level scope* will
# give you *defaults* only. Probably not what you wanted.
# TODO(chrismd) refactor the graphite metrics hierarchy to be cleaner,
# more consistent, and make room for frontend metrics.
#metric_prefix = "Graphite.backend.%(program)s.%(instance)s." % settings
def increment(stat, increase=1):
try:
stats[stat] += increase
except KeyError:
stats[stat] = increase
def max(stat, newval):
try:
if stats[stat] < newval:
stats[stat] = newval
except KeyError:
stats[stat] = newval
def append(stat, value):
try:
stats[stat].append(value)
except KeyError:
stats[stat] = [value]
def getCpuUsage():
global lastUsage, lastUsageTime
rusage = getrusage(RUSAGE_SELF)
currentUsage = rusage.ru_utime + rusage.ru_stime
currentTime = time.time()
usageDiff = currentUsage - lastUsage
timeDiff = currentTime - lastUsageTime
if timeDiff == 0: #shouldn't be possible, but I've actually seen a ZeroDivisionError from this
timeDiff = 0.000001
cpuUsagePercent = (usageDiff / timeDiff) * 100.0
lastUsage = currentUsage
lastUsageTime = currentTime
return cpuUsagePercent
def getMemUsage():
rss_pages = int( open('/proc/self/statm').read().split()[1] )
return rss_pages * PAGESIZE
def recordMetrics():
global lastUsage
myStats = stats.copy()
stats.clear()
# cache metrics
if settings.program == 'carbon-cache':
record = cache_record
updateTimes = myStats.get('updateTimes', [])
committedPoints = myStats.get('committedPoints', 0)
creates = myStats.get('creates', 0)
errors = myStats.get('errors', 0)
cacheQueries = myStats.get('cacheQueries', 0)
cacheBulkQueries = myStats.get('cacheBulkQueries', 0)
cacheOverflow = myStats.get('cache.overflow', 0)
cacheBulkQuerySizes = myStats.get('cacheBulkQuerySize', [])
# Calculate cache-data-structure-derived metrics prior to storing anything
# in the cache itself -- which would otherwise affect said metrics.
cache_size = cache.MetricCache.size
cache_queues = len(cache.MetricCache)
record('cache.size', cache_size)
record('cache.queues', cache_queues)
if updateTimes:
avgUpdateTime = sum(updateTimes) / len(updateTimes)
record('avgUpdateTime', avgUpdateTime)
if committedPoints:
pointsPerUpdate = float(committedPoints) / len(updateTimes)
record('pointsPerUpdate', pointsPerUpdate)
if cacheBulkQuerySizes:
avgBulkSize = sum(cacheBulkQuerySizes) / len(cacheBulkQuerySizes)
record('cache.bulk_queries_average_size', avgBulkSize)
record('updateOperations', len(updateTimes))
record('committedPoints', committedPoints)
record('creates', creates)
record('errors', errors)
record('cache.queries', cacheQueries)
record('cache.bulk_queries', cacheBulkQueries)
record('cache.overflow', cacheOverflow)
# aggregator metrics
elif settings.program == 'carbon-aggregator':
from carbon.aggregator.buffers import BufferManager
record = aggregator_record
record('allocatedBuffers', len(BufferManager))
record('bufferedDatapoints',
sum([b.size for b in BufferManager.buffers.values()]))
record('aggregateDatapointsSent', myStats.get('aggregateDatapointsSent', 0))
# relay metrics
else:
record = relay_record
prefix = 'destinations.'
relay_stats = [(k,v) for (k,v) in myStats.items() if k.startswith(prefix)]
for stat_name, stat_value in relay_stats:
record(stat_name, stat_value)
# common metrics
record('metricsReceived', myStats.get('metricsReceived', 0))
record('blacklistMatches', myStats.get('blacklistMatches', 0))
record('whitelistRejects', myStats.get('whitelistRejects', 0))
record('cpuUsage', getCpuUsage())
try: # This only works on Linux
record('memUsage', getMemUsage())
except Exception:
pass
def cache_record(metric, value):
prefix = settings.CARBON_METRIC_PREFIX
if settings.instance is None:
fullMetric = '%s.agents.%s.%s' % (prefix, HOSTNAME, metric)
else:
fullMetric = '%s.agents.%s-%s.%s' % (prefix, HOSTNAME, settings.instance, metric)
datapoint = (time.time(), value)
cache.MetricCache.store(fullMetric, datapoint)
def relay_record(metric, value):
prefix = settings.CARBON_METRIC_PREFIX
if settings.instance is None:
fullMetric = '%s.relays.%s.%s' % (prefix, HOSTNAME, metric)
else:
fullMetric = '%s.relays.%s-%s.%s' % (prefix, HOSTNAME, settings.instance, metric)
datapoint = (time.time(), value)
events.metricGenerated(fullMetric, datapoint)
def aggregator_record(metric, value):
prefix = settings.CARBON_METRIC_PREFIX
if settings.instance is None:
fullMetric = '%s.aggregator.%s.%s' % (prefix, HOSTNAME, metric)
else:
fullMetric = '%s.aggregator.%s-%s.%s' % (prefix, HOSTNAME, settings.instance, metric)
datapoint = (time.time(), value)
events.metricGenerated(fullMetric, datapoint)
class InstrumentationService(Service):
def __init__(self):
self.record_task = LoopingCall(recordMetrics)
# Default handlers
events.metricReceived.addHandler(lambda metric, datapoint: increment('metricsReceived'))
def startService(self):
if settings.CARBON_METRIC_INTERVAL > 0:
self.record_task.start(settings.CARBON_METRIC_INTERVAL, False)
Service.startService(self)
def stopService(self):
if settings.CARBON_METRIC_INTERVAL > 0:
self.record_task.stop()
Service.stopService(self)
# Avoid import circularities
from carbon import state, events, cache
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment