Created
November 3, 2022 18:14
-
-
Save lstrojny/eeb6c2f780670cfad18be1ede80ffab5 to your computer and use it in GitHub Desktop.
A development loop for jsonnet based grafana dashboard
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from watchdog.observers import Observer # type: ignore | |
from watchdog.events import LoggingEventHandler, RegexMatchingEventHandler # type: ignore | |
import logging | |
import sys | |
import time | |
import subprocess | |
import os | |
import re | |
from typing import Union, Tuple, Literal | |
import json | |
import requests | |
import uuid | |
from datetime import datetime | |
import webbrowser | |
from pathlib import Path | |
import argparse | |
# TODOs | |
# - send only if compiled JSON changed | |
# - better UI | |
# - handle libsonnet files | |
class Jsonnet: | |
def __init__(self, vendor_dir: str): | |
self.__vendor_dir = vendor_dir | |
def lint(self, file: str) -> Tuple[bool, str]: | |
result = subprocess.run(["jsonnet-lint", "-J", self.__vendor_dir, file], capture_output=True) | |
if result.returncode == 0: | |
return True, "" | |
return False, result.stderr.decode("utf-8").replace("Problems found!", "").strip() | |
def compile(self, file: str) -> Tuple[bool, str]: | |
result = subprocess.run(["jsonnet", "-J", self.__vendor_dir, file], capture_output=True) | |
if result.returncode != 0: | |
return False, result.stderr.decode("utf-8").strip() | |
return True, result.stdout.decode("utf-8") | |
class Ui: | |
def __init__(self, base_dir: str): | |
self.__base_dir = base_dir | |
def lint_error(self, file: str, error: str) -> None: | |
self.__error("linting", file, error) | |
def lint_success(self, file: str) -> None: | |
self.__success("linting", file) | |
def compile_error(self, file: str, error: str) -> None: | |
self.__error("compiling", file, error) | |
def compile_success(self, file: str) -> None: | |
self.__success("compiling", file) | |
def cleanup_success(self) -> None: | |
self.__success("Session cleaned up") | |
def dashboard_created(self, file: str, url: str) -> None: | |
self.__success(f"Dashboard created: {url}") | |
webbrowser.open(url) | |
def dashboard_error(self, file: str, error: str) -> None: | |
self.__error("Could not create dashboard from", file, error) | |
def stopping(self) -> None: | |
self.__info("\nStopping ...") | |
def file_not_found(self, file) -> None: | |
self.__info(f"File {self.__path(file)} no longer found") | |
def __error(self, text: str, file: str, error: str) -> None: | |
print(f"❌ {text} {self.__path(file)}\n\n{self.__indent(error)}\n") | |
def __success(self, text: str, file: str = None) -> None: | |
print(f"✅ {text} {file and self.__path(file) or ''}") | |
def __info(self, text: str) -> None: | |
print(f"{text}") | |
def __path(self, file: str) -> str: | |
return '"%s"' % os.path.relpath(file, self.__base_dir) | |
def __indent(self, string: str) -> str: | |
return re.compile(r"^", re.MULTILINE).sub(" ", string).replace(self.__base_dir + "/", "") | |
class Grafana: | |
__folder_uid = None | |
__dashboards = [] | |
def __init__(self, base_url: str, token: str): | |
self.__base_url = base_url | |
self.__token = token | |
def publish_dashboard(self, file: str, data: dict) -> Tuple[bool, str]: | |
self.create_session_folder() | |
(unique_uid, error) = self.__check_unique(file, data, "uid") | |
if not unique_uid: | |
return (False, error) | |
(unique_title, error) = self.__check_unique(file, data, "title") | |
if not unique_title: | |
return (False, error) | |
body = {"dashboard": data | {"editable": True}, "overwrite": True, "folderUid": self.__folder_uid} | |
response = requests.post(self.__url("api/dashboards/db"), json=body, headers=self.__headers()) | |
response.raise_for_status() | |
response_data = response.json() | |
self.__dashboards.append({"uid": response_data["uid"], "title": data["title"], "file": file}) | |
return (True, self.__url(response_data["url"])) | |
def __check_unique(self, file: str, data: dict, attr: str) -> Tuple[bool, str]: | |
prev_dashboard = next((dashboard for dashboard in self.__dashboards if dashboard[attr] == data[attr]), None) | |
if not prev_dashboard or prev_dashboard["file"] == file: | |
return (True, "") | |
return ( | |
False, | |
f"Would override dashboard from \"{prev_dashboard['file']}\" because {attr} \"{prev_dashboard[attr]}\" is identical", | |
) | |
def delete_dashboard(self, file: str) -> None: | |
dashboard = next((dashboard for dashboard in self.__dashboards if dashboard["file"] == file), None) | |
print("delete", dashboard) | |
if not dashboard: | |
return | |
response = requests.delete(self.__url("api/dashboards/uid", dashboard["uid"]), headers=self.__headers()) | |
if response.status_code == 404: | |
# Already gone | |
return | |
response.raise_for_status() | |
def create_session_folder(self): | |
if self.__folder_uid: | |
return | |
uid = str(uuid.uuid4()) | |
user = os.environ.get("USER", "unknown") | |
date = datetime.now() | |
body = {"uid": uid, "title": f"Development session for {user} on {date}"} | |
response = requests.post(self.__url("api/folders"), json=body, headers=self.__headers()) | |
response.raise_for_status() | |
self.__folder_uid = uid | |
def cleanup_session_folder(self) -> None: | |
if not self.__folder_uid: | |
return | |
response = requests.delete(self.__url("api/folders", self.__folder_uid), headers=self.__headers()) | |
if response.status_code == 404: | |
# Already gone | |
self.__folder_uid = None | |
return | |
response.raise_for_status() | |
self.__folder_uid = None | |
def __url(self, *path_elements: str) -> str: | |
return self.__base_url.rstrip("/") + "/" + "/".join(path_elements).lstrip("/") | |
def __headers(self) -> dict[str, str]: | |
return {"Authorization": "Bearer " + self.__token} | |
class Renderer: | |
def __init__(self, jsonnet: Jsonnet, grafana: Grafana, ui: Ui): | |
self.__jsonnet = jsonnet | |
self.__grafana = grafana | |
self.__ui = ui | |
def try_render(self, file: str) -> None: | |
if not os.path.exists(file): | |
self.__ui.file_not_found(file) | |
self.__grafana.delete_dashboard(file) | |
return | |
(linted, error) = self.__jsonnet.lint(file) | |
if not linted: | |
self.__ui.lint_error(file, error) | |
return | |
self.__ui.lint_success(file) | |
(compiled, result) = self.__jsonnet.compile(file) | |
if not compiled: | |
self.__ui.compile_error(file, result) | |
return | |
self.__ui.compile_success(file) | |
dashboard = json.loads(result) | |
(created, result) = self.__grafana.publish_dashboard(file, dashboard) | |
if not created: | |
self.__ui.dashboard_error(file, result) | |
return | |
self.__ui.dashboard_created(file, result) | |
class ChangeEventHandler(RegexMatchingEventHandler): | |
def __init__(self, renderer: Renderer): | |
super().__init__(regexes=[r".+\.jsonnet$"], ignore_directories=True) | |
self.__renderer = renderer | |
def on_created(self, event): | |
self.__renderer.try_render(event.src_path) | |
def on_modified(self, event): | |
self.__renderer.try_render(event.src_path) | |
def on_deleted(self, event): | |
self.__renderer.try_render(event.src_path) | |
def on_moved(self, event): | |
self.__renderer.try_render(event.src_path) | |
self.__renderer.try_render(event.dest_path) | |
def main(): | |
parser = argparse.ArgumentParser(prog=os.path.basename(__file__)) | |
parser.add_argument("--grafana-url", type=str, nargs="?", help="Grafana base URL") | |
parser.add_argument( | |
"--grafana-token", type=str, nargs="?", help="Grafana API token (used as authentication bearer)" | |
) | |
parser.add_argument( | |
"source_path", type=str, nargs="?", help="Path to the directory where the .jsonnet files are located" | |
) | |
args = parser.parse_args() | |
base_dir = ( | |
subprocess.run(["git", "-C", args.source_path, "rev-parse", "--show-toplevel"], capture_output=True) | |
.stdout.decode("utf-8") | |
.strip() | |
or "/" | |
) | |
grafana = Grafana(args.grafana_url, args.grafana_token) | |
ui = Ui(base_dir) | |
renderer = Renderer(Jsonnet(os.path.join(base_dir, "vendor", "jsonnet")), grafana, ui) | |
event_handler = ChangeEventHandler(renderer) | |
observer = Observer() | |
observer.schedule(event_handler, args.source_path, recursive=True) | |
try: | |
for file in Path(args.source_path).rglob("*.jsonnet"): | |
renderer.try_render(str(file.absolute())) | |
observer.start() | |
while True: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
ui.stopping() | |
observer.stop() | |
grafana.cleanup_session_folder() | |
ui.cleanup_success() | |
observer.join() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment