Skip to content

Instantly share code, notes, and snippets.

@wouterdb
Created June 16, 2022 19:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wouterdb/215327338fddc390ef3f91b522886cae to your computer and use it in GitHub Desktop.
Save wouterdb/215327338fddc390ef3f91b522886cae to your computer and use it in GitHub Desktop.
Nagios checks for inmanta: example code
import datetime
from datetime import timedelta
from urllib.parse import quote
import requests
base_url = "http://192.168.5.77:8888/"
def list_environments():
response = requests.get(base_url + "api/v1/environment")
response.raise_for_status()
return {r["id"]: r["name"] for r in response.json()["environments"]}
def get_settings(env: str):
response = requests.get(
base_url + "api/v2/environment_settings", headers={"X-Inmanta-tid": env}
)
response.raise_for_status()
data = response.json()["data"]
defaults = {v["name"]: v["default"] for v in data["definition"].values()}
defaults.update(data["settings"])
return defaults
def assert_setting(env):
settings = get_settings(env)
required = {
"auto_deploy": True,
"server_compile": True,
"purge_on_delete": False,
"push_on_auto_deploy": True,
"protected_environment": True,
"agent_trigger_method_on_auto_deploy": "push_incremental_deploy",
}
for k, v in required.items():
if settings.get(k) != v:
print(f" Environment Setting {k} should be {v} but is {settings.get(k)}")
def assert_agents_up(env):
response = requests.get(base_url + "api/v2/agents", headers={"X-Inmanta-tid": env})
response.raise_for_status()
data = response.json()["data"]
paused = {a["name"] for a in data if a["paused"]}
down = {a["name"] for a in data if a["status"] != "up"}
down = down - paused
if down:
print(" agents down: ", " ".join(down))
if paused:
print(" agents paused: ", " ".join(paused))
def failed_exporting_compiles(env):
min24h = (datetime.datetime.utcnow() - timedelta(days=1)).isoformat()
response = requests.get(
base_url
+ f"api/v2/compilereport?filter.success=False&filter.requested=ge:{min24h}",
headers={"X-Inmanta-tid": env},
)
response.raise_for_status()
data = response.json()["data"]
failed = [x["id"] for x in data if x["do_export"]]
for cid in failed:
print(f" Compile failed: {base_url}console/compilereports/{cid}?env={env}")
def get_failed_resource(env):
response = requests.get(
base_url
+ f"api/v2/resource?limit=20&filter.status=failed&sort=resource_type.asc",
headers={"X-Inmanta-tid": env},
)
response.raise_for_status()
data = response.json()["data"]
for resource in data:
rid = resource["resource_id"]
rid = quote(rid)
resource_url = f"{base_url}console/resources/{rid}?env={env}"
print(f" Failed deployment on {resource_url}")
def get_all_services(env):
response = requests.get(
base_url + f"lsm/v1/service_catalog",
headers={"X-Inmanta-tid": env},
)
response.raise_for_status()
data = response.json()["data"]
def find_bad_states(states):
return [state["name"] for state in states if state["label"] == "danger"]
return {d["name"]: find_bad_states(d["lifecycle"]["states"]) for d in data}
def no_failed_services(env):
services = get_all_services(env)
for service, bad_states in services.items():
filter = "&".join(f"filter.state={s}" for s in bad_states)
response = requests.get(
base_url + f"lsm/v1/service_inventory/{service}?{filter}",
headers={"X-Inmanta-tid": env},
)
response.raise_for_status()
data = response.json()["data"]
for instance in data:
diagnose_url = f"{base_url}console/lsm/catalog/{service}/inventory/{instance['id']}/diagnose?env={env}"
text_id = instance["service_identity_attribute_value"] or instance["id"]
print(f"Service failed for {service} {text_id}: {diagnose_url}")
for env, name in list_environments().items():
print(name)
assert_setting(env)
assert_agents_up(env)
failed_exporting_compiles(env)
get_failed_resource(env)
no_failed_services(env)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment