Skip to content

Instantly share code, notes, and snippets.

Created June 20, 2018 17:25
Show Gist options
  • Save rybit/c2c17137f608d2b17204f86c7ea7cd12 to your computer and use it in GitHub Desktop.
Save rybit/c2c17137f608d2b17204f86c7ea7cd12 to your computer and use it in GitHub Desktop.
# stdlib
from collections import defaultdict
from urlparse import urlparse
import re
# 3rd party
import requests
# project
from checks import AgentCheck
PATH = "path"
ALIAS = "alias"
TYPE = "type"
TAGS = "tags"
GAUGE = "gauge"
RATE = "rate"
COUNTER = "counter"
MONOTONIC = "monotonic"
GAUGE: AgentCheck.gauge,
RATE: AgentCheck.rate,
COUNTER: AgentCheck.increment,
MONOTONIC: AgentCheck.monotonic_count,
class JsonCheck(AgentCheck):
def _get_data(self, url, instance):
ssl_params = {
'ssl': instance.get('ssl'),
'ssl_keyfile': instance.get('ssl_keyfile'),
'ssl_certfile': instance.get('ssl_certfile'),
'ssl_verify': instance.get('ssl_verify'),
for key, param in ssl_params.items():
if param is None:
del ssl_params[key]
# Load SSL configuration, if available.
# ssl_verify can be a bool or a string (
if isinstance(ssl_params.get('ssl_verify'), bool) or isinstance(ssl_params.get('ssl_verify'), basestring):
verify = ssl_params.get('ssl_verify')
verify = None
if ssl_params.get('ssl_certfile') and ssl_params.get('ssl_keyfile'):
cert = (ssl_params.get('ssl_certfile'), ssl_params.get('ssl_keyfile'))
elif ssl_params.get('ssl_certfile'):
cert = ssl_params.get('ssl_certfile')
cert = None
resp = requests.get(
return resp.json()
def _load(self, instance):
url = instance.get('url')
if not url:
raise Exception('Json instance missing "url" value.')
tags = instance.get('tags', [])
tags.append("url:%s" % url)
data = self._get_data(url, instance)
metrics = instance.get("metrics", [])
namespace = instance.get('namespace', DEFAULT_METRIC_NAMESPACE)
return data, tags, metrics, namespace
def check(self, instance):
data, tags, metrics, namespace = self._load(instance)
self.parse_json_data(data, tags, metrics, namespace)
def parse_json_data(self, data, tags, metrics, namespace):
Report all the metrics based on the configuration in instance
If a metric is not well configured or is not present in the payload,
continue processing metrics but log the information to the info page
count = 0
for metric in metrics:
path = metric.get(PATH)
metric_type = metric.get(TYPE, DEFAULT_TYPE)
metric_tags = list(metric.get(TAGS, []))
metric_tags += tags
alias = metric.get(ALIAS)
if not path:
self.warning("Metric %s has no path" % metric)
if metric_type not in SUPPORTED_TYPES:
self.warning("Metric type %s not supported for this check" % metric_type)
keys = path.split("/")
values = self.deep_get(data, keys)
if len(values) == 0:
self.warning("No results matching path %s" % path)
tag_by_path = alias is not None
for traversed_path, value in values:
actual_path = ".".join(traversed_path)
path_tag = ["path:%s" % actual_path] if tag_by_path else []
metric_name = alias or self.normalize(actual_path, namespace, fix_case=True)
except ValueError:
self.log.warning("Unreportable value for path %s: %s" % (path, value))
SUPPORTED_TYPES[metric_type](self, metric_name, value, metric_tags + path_tag)
count += 1
def deep_get(self, content, keys, traversed_path=None):
Allow to retrieve content nested inside a several layers deep dict/list
Examples: -content: {
"key1": {
"key2" : [
"name" : "object1",
"value" : 42
"name" : "object2",
"value" : 72
-keys: ["key1", "key2", "1", "value"] would return [(["key1", "key2", "1", "value"], 72)]
-keys: ["key1", "key2", "1", "*"] would return [(["key1", "key2", "1", "value"], 72), (["key1", "key2", "1", "name"], "object2")]
-keys: ["key1", "key2", "*", "value"] would return [(["key1", "key2", "1", "value"], 72), (["key1", "key2", "0", "value"], 42)]
if traversed_path is None:
traversed_path = []
if keys == []:
return [(traversed_path, content)]
key = keys[0]
regex = "".join(["^", key, "$"])
key_rex = re.compile(regex)
except Exception:
self.warning("Cannot compile regex: %s" % regex)
return []
results = []
for new_key, new_content in self.items(content):
if key_rex.match(new_key):
results.extend(self.deep_get(new_content, keys[1:], traversed_path + [str(new_key)]))
return results
def items(self, object):
if isinstance(object, list):
for new_key, new_content in enumerate(object):
yield str(new_key), new_content
elif isinstance(object, dict):
for new_key, new_content in object.iteritems():
yield str(new_key), new_content
self.log.warning("Could not parse this object, check the json"
"served by the url")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment