Last active
December 16, 2019 17:38
-
-
Save Cellebyte/230b1ec956047a4925d1f68982003fa1 to your computer and use it in GitHub Desktop.
Grafana Backup with requests and prometheus_client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import logging | |
from datetime import datetime | |
from enum import Enum | |
from typing import List | |
from urllib.parse import urlparse | |
from prometheus_client import REGISTRY, CollectorRegistry, Gauge, Info | |
class BackupTask(Enum): | |
BACKUP = 'backup' | |
ROTATE = 'rotate' | |
RESTORE = 'restore' | |
class Backup(object): | |
"""Backup Interface should be implemented by | |
any backup task | |
:param object: Base object of Python | |
:type object: [type] | |
:raises NotImplementedError: implement the missing function | |
:raises NotImplementedError: implement the missing function | |
""" | |
def __init__( | |
self, | |
name: str = 'backup', | |
registry: CollectorRegistry = REGISTRY | |
) -> Backup: | |
"""__init__ Backup init function. Constructor. | |
:param name: the name of task, defaults to 'backup' | |
:type name: str, optional | |
:param registry: the registry for prometheus, defaults to REGISTRY | |
:type registry: CollectorRegistry, optional | |
:raises NotImplementedError: implement the missing function | |
:raises NotImplementedError: implement the missing function | |
:return: A Backup interface without any functionality | |
:rtype: Backup | |
""" | |
self.registry = registry | |
self.name = name | |
logging.debug("The job name in Backup {0}".format(self.name)) | |
self.backup_task: BackupTask = BackupTask.BACKUP | |
self._up: Gauge = Gauge( | |
"up", | |
"If 1 the backup was correctly created.", | |
labelnames=['name', 'backup_task'], | |
registry=self.registry | |
) | |
self._starttime: Gauge = Gauge( | |
"job_backup_start_unixtime", | |
"The Start Time of the backup.", | |
labelnames=['name', 'backup_task'], | |
registry=self.registry | |
) | |
self._endtime: Gauge = Gauge( | |
"job_backup_end_unixtime", | |
"The End Time of the backup.", | |
labelnames=['name', 'backup_task'], | |
registry=self.registry | |
) | |
self._duration: Gauge = Gauge( | |
"job_backup_duration", | |
"The duration of the backup.", | |
labelnames=['name', 'backup_task'], | |
registry=self.registry | |
) | |
self._total: Gauge = Gauge( | |
"job_backup_db_total", | |
"The amount of indices or databases which can be backuped.", | |
labelnames=['name', 'backup_task'], | |
registry=self.registry | |
) | |
self._failed: Gauge = Gauge( | |
"job_backup_db_failed", | |
"The amount of failed indices or databases which were skipped.", | |
labelnames=['name', 'backup_task'], | |
registry=self.registry | |
) | |
self._success: Gauge = Gauge( | |
"job_backup_db_success", | |
"The amount of indices or " + | |
"databases which were sucessfully backuped.", | |
labelnames=['name', 'backup_task'], | |
registry=self.registry | |
) | |
self._last_success: Gauge = Gauge( | |
"job_backup_last_success_unixtime", | |
"Last time a batch job sucessfully finished", | |
labelnames=['name', 'backup_task'], | |
registry=self.registry | |
) | |
self._backup: Info = Info( | |
'job_backup', | |
"The name of the backup and other things.", | |
registry=self.registry | |
) | |
self._error: Info = Info( | |
'job_backup_error', | |
"The error which appeared for a backup", | |
registry=self.registry | |
) | |
self._rotate_deleted: Gauge = Gauge( | |
"job_backup_rotate_deleted", | |
"The amount of deleted Backups by rotate.", | |
labelnames=['name', 'backup_task'], | |
registry=self.registry | |
) | |
self._rotate_total: Gauge = Gauge( | |
"job_backup_rotate_total", | |
"The amount of backups before rotation.", | |
labelnames=['name', 'backup_task'], | |
registry=self.registry | |
) | |
def _event(self, *args, **kwargs): | |
_ = kwargs.pop('event_type') | |
@property | |
def date(self) -> str: | |
"""date A property which returns the current DateTime as String | |
Formatted | |
:return: Current DateTime with format "%Y%m%d-%H%M%S" | |
:rtype: str | |
""" | |
return datetime.utcnow().strftime("%Y%m%d-%H%M%S") | |
def url(self, url) -> str: | |
return urlparse(url).geturl() | |
def backup(self) -> bool: | |
"""backup Should perform everything | |
to backup the implemented thing. | |
:raises NotImplementedError: not implemented | |
:return: If an error occurs return True | |
:rtype: bool | |
""" | |
raise NotImplementedError | |
def rotate(self, rotate: int = 14) -> bool: | |
"""rotate Should perform everything | |
to rotate the backups in the filesystem. | |
:raises NotImplementedError: not implemented | |
:return: If an error occurs return True | |
:rtype: bool | |
""" | |
raise NotImplementedError | |
def restore(self, backup_name: str = None) -> bool: | |
"""restore [summary] | |
:param backup_name: should specify the backup name to restore, defaults to None | |
:type backup_name: str, optional | |
:raises NotImplementedError: not implemented | |
:return: If an error occurs return True | |
:rtype: bool | |
""" | |
raise NotImplementedError | |
def get_backups(self) -> List[str]: | |
"""restore [summary] | |
:param backup_name: should return a list of backups, defaults to None | |
:type backup_name: str, optional | |
:raises NotImplementedError: not implemented | |
:return: Returns a list of | |
:rtype: str | |
""" | |
raise NotImplementedError |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import errno | |
import json | |
import logging | |
import os | |
import shutil | |
import time | |
from typing import List | |
from zipfile import ZIP_DEFLATED, ZipFile | |
import requests | |
from prometheus_client import CollectorRegistry | |
from cached_property import cached_property | |
from .backup import Backup, BackupTask | |
class Grafana(Backup): | |
"""Grafana [summary] | |
:param Backup: [description] | |
:type Backup: [type] | |
:return: [description] | |
:rtype: [type] | |
""" | |
_tmp_path = './tmp' | |
_backup_name = 'grafana_backup' | |
def __init__( | |
self, | |
url: str, | |
name: str, | |
username: str, | |
password: str, | |
backup_path: str, | |
verify: bool, | |
registry: CollectorRegistry | |
) -> Grafana: | |
"""__init__ [summary] | |
:param url: [description] | |
:type url: str | |
:param name: [description] | |
:type name: str | |
:param username: [description] | |
:type username: str | |
:param password: [description] | |
:type password: str | |
:param registry: [description] | |
:type registry: CollectorRegistry | |
:return: [description] | |
:rtype: Grafana | |
""" | |
self.host = url | |
self.username = username | |
self.password = password | |
self.session = requests.Session() | |
self.session.auth = requests.auth.HTTPBasicAuth( | |
username, | |
password | |
) | |
self.overwrite = True | |
self.backup_path = backup_path | |
self.backup_path_name = os.path.join(backup_path, self.backup_name) | |
self.session.verify = verify | |
self.session.headers.update({ | |
"Content-Type": "application/json;charset=utf-8" | |
}) | |
self.org_id = 0 | |
self.counter = { | |
"success": 0, | |
"total": 0, | |
"failed": 0 | |
} | |
super(Grafana, self).__init__( | |
name=name, | |
registry=registry | |
) | |
@cached_property | |
def backup_name(self) -> str: | |
return "{}_{}.zip".format(self._backup_name, self.date) | |
def dashboard(self, dashboard_uid: str) -> dict: | |
return self.session.get( | |
self.get_dashboard_url(dashboard_uid), | |
params={ | |
"orgId": self.org_id | |
} | |
).json() | |
def dashboards(self) -> dict: | |
return [dashboard for dashboard in self.session.get( | |
self.get_dashboards_url(), | |
params={ | |
"orgId": self.org_id | |
} | |
).json() if dashboard['type'] == 'dash-db'] | |
@property | |
def folders(self) -> dict: | |
return self.session.get( | |
self.get_folders_url(), | |
params={ | |
"orgId": self.org_id | |
} | |
).json() | |
def get_dashboard_url(self, dashboard_uid: str) -> str: | |
return self.url( | |
f"{self.host}/api/dashboards/uid/{dashboard_uid}" | |
) | |
def get_folder_url(self, folder_id: str) -> str: | |
return self.url( | |
f"{self.host}/api/folders/{folder_id}" | |
) | |
@property | |
def organizations(self) -> list: | |
return self.session.get( | |
self.get_organization_url() | |
).json() | |
def get_dashboards_url(self) -> str: | |
return self.url( | |
f"{self.host}/api/search" | |
) | |
def get_folders_url(self) -> str: | |
return self.url( | |
f"{self.host}/api/folders" | |
) | |
def get_organization_url(self) -> str: | |
return self.url( | |
f"{self.host}/api/orgs" | |
) | |
def get_json_name(self, name: str): | |
return "{}.json".format(name) | |
def write_dashboard(self, dashboard: dict, root_folder: str): | |
dashboard_name = dashboard['dashboard']['title'] | |
folder = dashboard['meta']['folderTitle'] | |
filename = os.path.join( | |
self._tmp_path, | |
root_folder, | |
folder, | |
self.get_json_name(dashboard_name) | |
) | |
if not os.path.exists(os.path.dirname(filename)): | |
try: | |
os.makedirs(os.path.dirname(filename)) | |
except OSError as exc: | |
# Guard against race condition | |
if exc.errno != errno.EEXIST: | |
raise | |
dashboard = dashboard['dashboard'] | |
if 'id' in dashboard.keys(): | |
dashboard['id'] = None | |
with open(filename, "w") as file: | |
json.dump(dashboard, file, indent=2) | |
return filename | |
def get_import_dashboard_url(self): | |
return self.url( | |
f"{self.host}/api/dashboards/import" | |
) | |
def create_folder(self, folder: str) -> int: | |
# Handle the default folder | |
if folder == 'General': | |
return 0 | |
logging.info(f"Create Folder {folder}") | |
response = self.session.post( | |
self.get_folders_url(), | |
json={ | |
"title": str(folder) | |
}, | |
params={ | |
"orgId": self.org_id | |
} | |
).json() | |
logging.info(f"Got Info {response}") | |
return response['id'] | |
def create_organization(self, org_name: str) -> int: | |
response = self.session.post( | |
self.get_organization_url, | |
json={ | |
"name": org_name | |
} | |
).json() | |
return response['orgId'] | |
def check_if_folder_exists(self, folder_name: str) -> int: | |
print(self.folders) | |
for folder in self.folders: | |
print(folder['title'] == folder_name, folder) | |
if (folder['title'] == folder_name): | |
return folder['id'] | |
return self.create_folder(folder_name) | |
def check_if_organization_exists(self, org_name: str) -> int: | |
for organization in self.organizations: | |
if (organization['name'] == org_name): | |
return organization['id'] | |
return self.create_organization(org_name) | |
def enrich_dashboard_object(self, dashboard: dict, folder: str): | |
return { | |
"dashboard": dashboard, | |
"overwrite": self.overwrite, | |
"folderId": self.check_if_folder_exists(folder), | |
"inputs": [] | |
} | |
@property | |
def dashboard_import_referer(self) -> str: | |
return self.url(f"{self.host}/dashboard/import") | |
def import_dashboard(self, dashboard: dict, folder: str) -> bool: | |
return self.session.post( | |
self.get_import_dashboard_url(), | |
json=self.enrich_dashboard_object(dashboard, folder), | |
params={ | |
'orgId': self.org_id | |
} | |
).ok | |
def restore(self, backup_name: str, overwrite: bool = True) -> bool: | |
self.overwrite = overwrite | |
self.backup_task = BackupTask.RESTORE | |
backup_path_name = os.path.join(self.backup_path, backup_name) | |
starttime = time.time() * 1000 | |
self._starttime.labels( | |
self.name, self.backup_task.value | |
).set(starttime) | |
with ZipFile(backup_path_name, 'r') as restore_zip: | |
for file in restore_zip.filelist: | |
org, folder, _ = file.filename.split(os.sep) | |
with restore_zip.open(file.filename) as restore_file: | |
try: | |
dashboard = json.loads(restore_file.read()) | |
self.org_id = self.check_if_organization_exists(org) | |
ok = self.import_dashboard(dashboard, folder) | |
except Exception as e: | |
logging.error(e) | |
self.counter['total'] += 1 | |
if not ok: | |
self.counter['failed'] += 1 | |
logging.error("Cannot load Dashboard {0}".format(file.filename)) | |
else: | |
self.counter['success'] += 1 | |
logging.info("Imported Dashboard {0}".format(file.filename)) | |
if self.counter > 0: | |
self._up.labels( | |
self.name, self.backup_task.value | |
).set(0) | |
return True | |
self._up.labels( | |
self.name, self.backup_task.value | |
).set(1) | |
self._backup.info( | |
{ | |
"name": self.name, | |
"backup_task": self.backup_task.value, | |
"backup_name": self.backup_name, | |
"state": "done" | |
} | |
) | |
self._total.labels( | |
self.name, self.backup_task.value | |
).set( | |
self.counter['total'] | |
) | |
self._success.labels( | |
self.name, self.backup_task.value | |
).set( | |
self.counter['success'] | |
) | |
self._failed.labels( | |
self.name, self.backup_task.value | |
).set( | |
self.counter['failed'] | |
) | |
endtime = time.time() * 1000 | |
self._endtime.labels( | |
self.name, self.backup_task.value | |
).set(endtime) | |
self._duration.labels( | |
self.name, self.backup_task.value | |
).set( | |
endtime - starttime | |
) | |
self._last_success.labels( | |
self.name, self.backup_task.value | |
).set( | |
endtime | |
) | |
return False | |
def backup(self) -> bool: | |
"""backup [summary] | |
:return: [description] | |
:rtype: bool | |
""" | |
self.backup_task = BackupTask.BACKUP | |
logging.debug("{}".format(self.dashboards)) | |
starttime = time.time() * 1000 | |
self._starttime.labels( | |
self.name, self.backup_task.value | |
).set(starttime) | |
logging.info(self.dashboards) | |
with ZipFile(self.backup_path_name, 'w') as backup_zip: | |
for organization in self.organizations: | |
self.org_id = organization['id'] | |
root_folder = organization['name'] | |
for dashboard in self.dashboards(): | |
self.counter["total"] += 1 | |
try: | |
full_dashboard = self.dashboard(dashboard['uid']) | |
dashboard_name = full_dashboard['dashboard']['title'] | |
folder = full_dashboard['meta']['folderTitle'] | |
path = f'{root_folder}{os.sep}{folder}' + \ | |
f'{os.sep}{self.get_json_name(dashboard_name)}' | |
local_path = self.write_dashboard( | |
full_dashboard, | |
root_folder | |
) | |
backup_zip.write( | |
local_path, | |
path, | |
ZIP_DEFLATED | |
) | |
self.counter['success'] += 1 | |
except Exception as e: | |
self.counter['failed'] += 1 | |
logging.error(e) | |
shutil.rmtree(self._tmp_path) | |
if self.counter['failed'] > 0: | |
self._up.labels( | |
self.name, self.backup_task.value | |
).set(0) | |
return True | |
self._up.labels( | |
self.name, self.backup_task.value | |
).set(1) | |
self._backup.info( | |
{ | |
"name": self.name, | |
"backup_task": self.backup_task.value, | |
"backup_name": self.backup_name, | |
"state": "done" | |
} | |
) | |
self._total.labels( | |
self.name, self.backup_task.value | |
).set( | |
self.counter['total'] | |
) | |
self._success.labels( | |
self.name, self.backup_task.value | |
).set( | |
self.counter['success'] | |
) | |
self._failed.labels( | |
self.name, self.backup_task.value | |
).set( | |
self.counter['failed'] | |
) | |
endtime = time.time() * 1000 | |
self._endtime.labels( | |
self.name, self.backup_task.value | |
).set(endtime) | |
self._duration.labels( | |
self.name, self.backup_task.value | |
).set( | |
endtime - starttime | |
) | |
self._last_success.labels( | |
self.name, self.backup_task.value | |
).set( | |
endtime | |
) | |
return False | |
def rotate(self, rotate: int = 4) -> bool: | |
"""rotate [summary] | |
:param rotate: [description], defaults to 4 | |
:type rotate: int, optional | |
:return: [description] | |
:rtype: bool | |
""" | |
self.backup_task = BackupTask.ROTATE | |
starttime = time.time() * 1000 | |
self._starttime.labels( | |
self.name, self.backup_task.value | |
).set(starttime) | |
for backup in sorted(self.get_backups())[:-rotate]: | |
try: | |
logging.warning(f"Delete Backup {backup}") | |
os.remove( | |
os.path.join( | |
self.backup_path, | |
backup | |
) | |
) | |
except Exception as e: | |
self._up.labels( | |
self.name, self.backup_task.value | |
).set(0) | |
logging.error(e) | |
return True | |
endtime = time.time() * 1000 | |
self._up.labels( | |
self.name, self.backup_task.value | |
).set(1) | |
self._endtime.labels( | |
self.name, self.backup_task.value | |
).set(endtime) | |
self._duration.labels( | |
self.name, self.backup_task.value | |
).set( | |
endtime - starttime | |
) | |
self._last_success.labels( | |
self.name, self.backup_task.value | |
).set( | |
endtime | |
) | |
return False | |
def get_backups(self) -> List[str]: | |
return [file for file in os.listdir(self.backup_path) if file.endswith('.zip')] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment