Skip to content

Instantly share code, notes, and snippets.

@progrium
Last active Sep 16, 2020
Embed
What would you like to do?
Consul health check integration with DataDog
import requests
from checks import AgentCheck
class ConsulCheck(AgentCheck):
def should_check(self):
r = requests.get(self.init_config["consul_url"] + "/v1/agent/self")
if r.status_code != 200:
return False
agent = r.json()
agent_addr = "{}:{}".format(agent["Config"]["AdvertiseAddr"], agent["Config"]["Ports"]["Server"])
r = requests.get(self.init_config["consul_url"] + "/v1/status/leader")
if r.status_code != 200:
return False
return agent_addr == r.json()
def check(self, instance):
if not self.should_check():
return
r = requests.get(self.init_config["consul_url"] + "/v1/health/state/any")
if r.status_code == 200:
checks = [c for c in r.json() if c["Status"] in ["passing", "warning", "critical"]]
for check in checks:
tags = ["check:{}".format(check["CheckID"])]
if check["ServiceName"] != "":
tags.append("service:{}".format(check["ServiceName"]))
if check["ServiceID"] != "":
tags.append("service-id:{}".format(check["ServiceID"]))
self.gauge('consul.check', {
"passing": 0,
"warning": 1,
"critical": 2,
}[check["Status"]], tags)
if __name__ == '__main__':
check, instances = ConsulCheck.from_yaml('./conf.d/consul.yaml')
for instance in instances:
print "Running the check"
check.check(instance)
if check.has_events():
print 'Events: %s' % (check.get_events())
print 'Metrics: %s' % (check.get_metrics())
init_config:
consul_url: http://localhost:8500
instances:
[{}]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment