Skip to content

Instantly share code, notes, and snippets.

@rybit
Created June 20, 2018 17:25
Show Gist options
  • Save rybit/c2c17137f608d2b17204f86c7ea7cd12 to your computer and use it in GitHub Desktop.
Save rybit/c2c17137f608d2b17204f86c7ea7cd12 to your computer and use it in GitHub Desktop.
# stdlib
from collections import defaultdict
from urlparse import urlparse
import re
# 3rd party
import requests
# project
from checks import AgentCheck
PATH = "path"
ALIAS = "alias"
TYPE = "type"
TAGS = "tags"
GAUGE = "gauge"
RATE = "rate"
COUNTER = "counter"
MONOTONIC = "monotonic"
DEFAULT_TYPE = GAUGE
DEFAULT_METRIC_NAMESPACE = "json"
SUPPORTED_TYPES = {
GAUGE: AgentCheck.gauge,
RATE: AgentCheck.rate,
COUNTER: AgentCheck.increment,
MONOTONIC: AgentCheck.monotonic_count,
}
class JsonCheck(AgentCheck):
def _get_data(self, url, instance):
ssl_params = {
'ssl': instance.get('ssl'),
'ssl_keyfile': instance.get('ssl_keyfile'),
'ssl_certfile': instance.get('ssl_certfile'),
'ssl_verify': instance.get('ssl_verify'),
}
for key, param in ssl_params.items():
if param is None:
del ssl_params[key]
# Load SSL configuration, if available.
# ssl_verify can be a bool or a string (http://docs.python-requests.org/en/latest/user/advanced/#ssl-cert-verification)
if isinstance(ssl_params.get('ssl_verify'), bool) or isinstance(ssl_params.get('ssl_verify'), basestring):
verify = ssl_params.get('ssl_verify')
else:
verify = None
if ssl_params.get('ssl_certfile') and ssl_params.get('ssl_keyfile'):
cert = (ssl_params.get('ssl_certfile'), ssl_params.get('ssl_keyfile'))
elif ssl_params.get('ssl_certfile'):
cert = ssl_params.get('ssl_certfile')
else:
cert = None
resp = requests.get(
url,
timeout=10,
verify=verify,
cert=cert
)
resp.raise_for_status()
return resp.json()
def _load(self, instance):
url = instance.get('url')
if not url:
raise Exception('Json instance missing "url" value.')
tags = instance.get('tags', [])
tags.append("url:%s" % url)
data = self._get_data(url, instance)
metrics = instance.get("metrics", [])
namespace = instance.get('namespace', DEFAULT_METRIC_NAMESPACE)
return data, tags, metrics, namespace
def check(self, instance):
data, tags, metrics, namespace = self._load(instance)
self.parse_json_data(data, tags, metrics, namespace)
def parse_json_data(self, data, tags, metrics, namespace):
'''
Report all the metrics based on the configuration in instance
If a metric is not well configured or is not present in the payload,
continue processing metrics but log the information to the info page
'''
count = 0
for metric in metrics:
path = metric.get(PATH)
metric_type = metric.get(TYPE, DEFAULT_TYPE)
metric_tags = list(metric.get(TAGS, []))
metric_tags += tags
alias = metric.get(ALIAS)
if not path:
self.warning("Metric %s has no path" % metric)
continue
if metric_type not in SUPPORTED_TYPES:
self.warning("Metric type %s not supported for this check" % metric_type)
continue
keys = path.split("/")
values = self.deep_get(data, keys)
if len(values) == 0:
self.warning("No results matching path %s" % path)
continue
tag_by_path = alias is not None
for traversed_path, value in values:
actual_path = ".".join(traversed_path)
path_tag = ["path:%s" % actual_path] if tag_by_path else []
metric_name = alias or self.normalize(actual_path, namespace, fix_case=True)
try:
float(value)
except ValueError:
self.log.warning("Unreportable value for path %s: %s" % (path, value))
continue
SUPPORTED_TYPES[metric_type](self, metric_name, value, metric_tags + path_tag)
count += 1
def deep_get(self, content, keys, traversed_path=None):
'''
Allow to retrieve content nested inside a several layers deep dict/list
Examples: -content: {
"key1": {
"key2" : [
{
"name" : "object1",
"value" : 42
},
{
"name" : "object2",
"value" : 72
}
]
}
}
-keys: ["key1", "key2", "1", "value"] would return [(["key1", "key2", "1", "value"], 72)]
-keys: ["key1", "key2", "1", "*"] would return [(["key1", "key2", "1", "value"], 72), (["key1", "key2", "1", "name"], "object2")]
-keys: ["key1", "key2", "*", "value"] would return [(["key1", "key2", "1", "value"], 72), (["key1", "key2", "0", "value"], 42)]
'''
if traversed_path is None:
traversed_path = []
if keys == []:
return [(traversed_path, content)]
key = keys[0]
regex = "".join(["^", key, "$"])
try:
key_rex = re.compile(regex)
except Exception:
self.warning("Cannot compile regex: %s" % regex)
return []
results = []
for new_key, new_content in self.items(content):
if key_rex.match(new_key):
results.extend(self.deep_get(new_content, keys[1:], traversed_path + [str(new_key)]))
return results
def items(self, object):
if isinstance(object, list):
for new_key, new_content in enumerate(object):
yield str(new_key), new_content
elif isinstance(object, dict):
for new_key, new_content in object.iteritems():
yield str(new_key), new_content
else:
self.log.warning("Could not parse this object, check the json"
"served by the url")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment