Skip to content

Instantly share code, notes, and snippets.

@Cellebyte
Last active December 16, 2019 17:38
Show Gist options
  • Save Cellebyte/230b1ec956047a4925d1f68982003fa1 to your computer and use it in GitHub Desktop.
Save Cellebyte/230b1ec956047a4925d1f68982003fa1 to your computer and use it in GitHub Desktop.
Grafana Backup with requests and prometheus_client
from __future__ import annotations
import logging
from datetime import datetime
from enum import Enum
from typing import List
from urllib.parse import urlparse
from prometheus_client import REGISTRY, CollectorRegistry, Gauge, Info
class BackupTask(Enum):
BACKUP = 'backup'
ROTATE = 'rotate'
RESTORE = 'restore'
class Backup(object):
"""Backup Interface should be implemented by
any backup task
:param object: Base object of Python
:type object: [type]
:raises NotImplementedError: implement the missing function
:raises NotImplementedError: implement the missing function
"""
def __init__(
self,
name: str = 'backup',
registry: CollectorRegistry = REGISTRY
) -> Backup:
"""__init__ Backup init function. Constructor.
:param name: the name of task, defaults to 'backup'
:type name: str, optional
:param registry: the registry for prometheus, defaults to REGISTRY
:type registry: CollectorRegistry, optional
:raises NotImplementedError: implement the missing function
:raises NotImplementedError: implement the missing function
:return: A Backup interface without any functionality
:rtype: Backup
"""
self.registry = registry
self.name = name
logging.debug("The job name in Backup {0}".format(self.name))
self.backup_task: BackupTask = BackupTask.BACKUP
self._up: Gauge = Gauge(
"up",
"If 1 the backup was correctly created.",
labelnames=['name', 'backup_task'],
registry=self.registry
)
self._starttime: Gauge = Gauge(
"job_backup_start_unixtime",
"The Start Time of the backup.",
labelnames=['name', 'backup_task'],
registry=self.registry
)
self._endtime: Gauge = Gauge(
"job_backup_end_unixtime",
"The End Time of the backup.",
labelnames=['name', 'backup_task'],
registry=self.registry
)
self._duration: Gauge = Gauge(
"job_backup_duration",
"The duration of the backup.",
labelnames=['name', 'backup_task'],
registry=self.registry
)
self._total: Gauge = Gauge(
"job_backup_db_total",
"The amount of indices or databases which can be backuped.",
labelnames=['name', 'backup_task'],
registry=self.registry
)
self._failed: Gauge = Gauge(
"job_backup_db_failed",
"The amount of failed indices or databases which were skipped.",
labelnames=['name', 'backup_task'],
registry=self.registry
)
self._success: Gauge = Gauge(
"job_backup_db_success",
"The amount of indices or " +
"databases which were sucessfully backuped.",
labelnames=['name', 'backup_task'],
registry=self.registry
)
self._last_success: Gauge = Gauge(
"job_backup_last_success_unixtime",
"Last time a batch job sucessfully finished",
labelnames=['name', 'backup_task'],
registry=self.registry
)
self._backup: Info = Info(
'job_backup',
"The name of the backup and other things.",
registry=self.registry
)
self._error: Info = Info(
'job_backup_error',
"The error which appeared for a backup",
registry=self.registry
)
self._rotate_deleted: Gauge = Gauge(
"job_backup_rotate_deleted",
"The amount of deleted Backups by rotate.",
labelnames=['name', 'backup_task'],
registry=self.registry
)
self._rotate_total: Gauge = Gauge(
"job_backup_rotate_total",
"The amount of backups before rotation.",
labelnames=['name', 'backup_task'],
registry=self.registry
)
def _event(self, *args, **kwargs):
_ = kwargs.pop('event_type')
@property
def date(self) -> str:
"""date A property which returns the current DateTime as String
Formatted
:return: Current DateTime with format "%Y%m%d-%H%M%S"
:rtype: str
"""
return datetime.utcnow().strftime("%Y%m%d-%H%M%S")
def url(self, url) -> str:
return urlparse(url).geturl()
def backup(self) -> bool:
"""backup Should perform everything
to backup the implemented thing.
:raises NotImplementedError: not implemented
:return: If an error occurs return True
:rtype: bool
"""
raise NotImplementedError
def rotate(self, rotate: int = 14) -> bool:
"""rotate Should perform everything
to rotate the backups in the filesystem.
:raises NotImplementedError: not implemented
:return: If an error occurs return True
:rtype: bool
"""
raise NotImplementedError
def restore(self, backup_name: str = None) -> bool:
"""restore [summary]
:param backup_name: should specify the backup name to restore, defaults to None
:type backup_name: str, optional
:raises NotImplementedError: not implemented
:return: If an error occurs return True
:rtype: bool
"""
raise NotImplementedError
def get_backups(self) -> List[str]:
"""restore [summary]
:param backup_name: should return a list of backups, defaults to None
:type backup_name: str, optional
:raises NotImplementedError: not implemented
:return: Returns a list of
:rtype: str
"""
raise NotImplementedError
from __future__ import annotations
import errno
import json
import logging
import os
import shutil
import time
from typing import List
from zipfile import ZIP_DEFLATED, ZipFile
import requests
from prometheus_client import CollectorRegistry
from cached_property import cached_property
from .backup import Backup, BackupTask
class Grafana(Backup):
"""Grafana [summary]
:param Backup: [description]
:type Backup: [type]
:return: [description]
:rtype: [type]
"""
_tmp_path = './tmp'
_backup_name = 'grafana_backup'
def __init__(
self,
url: str,
name: str,
username: str,
password: str,
backup_path: str,
verify: bool,
registry: CollectorRegistry
) -> Grafana:
"""__init__ [summary]
:param url: [description]
:type url: str
:param name: [description]
:type name: str
:param username: [description]
:type username: str
:param password: [description]
:type password: str
:param registry: [description]
:type registry: CollectorRegistry
:return: [description]
:rtype: Grafana
"""
self.host = url
self.username = username
self.password = password
self.session = requests.Session()
self.session.auth = requests.auth.HTTPBasicAuth(
username,
password
)
self.overwrite = True
self.backup_path = backup_path
self.backup_path_name = os.path.join(backup_path, self.backup_name)
self.session.verify = verify
self.session.headers.update({
"Content-Type": "application/json;charset=utf-8"
})
self.org_id = 0
self.counter = {
"success": 0,
"total": 0,
"failed": 0
}
super(Grafana, self).__init__(
name=name,
registry=registry
)
@cached_property
def backup_name(self) -> str:
return "{}_{}.zip".format(self._backup_name, self.date)
def dashboard(self, dashboard_uid: str) -> dict:
return self.session.get(
self.get_dashboard_url(dashboard_uid),
params={
"orgId": self.org_id
}
).json()
def dashboards(self) -> dict:
return [dashboard for dashboard in self.session.get(
self.get_dashboards_url(),
params={
"orgId": self.org_id
}
).json() if dashboard['type'] == 'dash-db']
@property
def folders(self) -> dict:
return self.session.get(
self.get_folders_url(),
params={
"orgId": self.org_id
}
).json()
def get_dashboard_url(self, dashboard_uid: str) -> str:
return self.url(
f"{self.host}/api/dashboards/uid/{dashboard_uid}"
)
def get_folder_url(self, folder_id: str) -> str:
return self.url(
f"{self.host}/api/folders/{folder_id}"
)
@property
def organizations(self) -> list:
return self.session.get(
self.get_organization_url()
).json()
def get_dashboards_url(self) -> str:
return self.url(
f"{self.host}/api/search"
)
def get_folders_url(self) -> str:
return self.url(
f"{self.host}/api/folders"
)
def get_organization_url(self) -> str:
return self.url(
f"{self.host}/api/orgs"
)
def get_json_name(self, name: str):
return "{}.json".format(name)
def write_dashboard(self, dashboard: dict, root_folder: str):
dashboard_name = dashboard['dashboard']['title']
folder = dashboard['meta']['folderTitle']
filename = os.path.join(
self._tmp_path,
root_folder,
folder,
self.get_json_name(dashboard_name)
)
if not os.path.exists(os.path.dirname(filename)):
try:
os.makedirs(os.path.dirname(filename))
except OSError as exc:
# Guard against race condition
if exc.errno != errno.EEXIST:
raise
dashboard = dashboard['dashboard']
if 'id' in dashboard.keys():
dashboard['id'] = None
with open(filename, "w") as file:
json.dump(dashboard, file, indent=2)
return filename
def get_import_dashboard_url(self):
return self.url(
f"{self.host}/api/dashboards/import"
)
def create_folder(self, folder: str) -> int:
# Handle the default folder
if folder == 'General':
return 0
logging.info(f"Create Folder {folder}")
response = self.session.post(
self.get_folders_url(),
json={
"title": str(folder)
},
params={
"orgId": self.org_id
}
).json()
logging.info(f"Got Info {response}")
return response['id']
def create_organization(self, org_name: str) -> int:
response = self.session.post(
self.get_organization_url,
json={
"name": org_name
}
).json()
return response['orgId']
def check_if_folder_exists(self, folder_name: str) -> int:
print(self.folders)
for folder in self.folders:
print(folder['title'] == folder_name, folder)
if (folder['title'] == folder_name):
return folder['id']
return self.create_folder(folder_name)
def check_if_organization_exists(self, org_name: str) -> int:
for organization in self.organizations:
if (organization['name'] == org_name):
return organization['id']
return self.create_organization(org_name)
def enrich_dashboard_object(self, dashboard: dict, folder: str):
return {
"dashboard": dashboard,
"overwrite": self.overwrite,
"folderId": self.check_if_folder_exists(folder),
"inputs": []
}
@property
def dashboard_import_referer(self) -> str:
return self.url(f"{self.host}/dashboard/import")
def import_dashboard(self, dashboard: dict, folder: str) -> bool:
return self.session.post(
self.get_import_dashboard_url(),
json=self.enrich_dashboard_object(dashboard, folder),
params={
'orgId': self.org_id
}
).ok
def restore(self, backup_name: str, overwrite: bool = True) -> bool:
self.overwrite = overwrite
self.backup_task = BackupTask.RESTORE
backup_path_name = os.path.join(self.backup_path, backup_name)
starttime = time.time() * 1000
self._starttime.labels(
self.name, self.backup_task.value
).set(starttime)
with ZipFile(backup_path_name, 'r') as restore_zip:
for file in restore_zip.filelist:
org, folder, _ = file.filename.split(os.sep)
with restore_zip.open(file.filename) as restore_file:
try:
dashboard = json.loads(restore_file.read())
self.org_id = self.check_if_organization_exists(org)
ok = self.import_dashboard(dashboard, folder)
except Exception as e:
logging.error(e)
self.counter['total'] += 1
if not ok:
self.counter['failed'] += 1
logging.error("Cannot load Dashboard {0}".format(file.filename))
else:
self.counter['success'] += 1
logging.info("Imported Dashboard {0}".format(file.filename))
if self.counter > 0:
self._up.labels(
self.name, self.backup_task.value
).set(0)
return True
self._up.labels(
self.name, self.backup_task.value
).set(1)
self._backup.info(
{
"name": self.name,
"backup_task": self.backup_task.value,
"backup_name": self.backup_name,
"state": "done"
}
)
self._total.labels(
self.name, self.backup_task.value
).set(
self.counter['total']
)
self._success.labels(
self.name, self.backup_task.value
).set(
self.counter['success']
)
self._failed.labels(
self.name, self.backup_task.value
).set(
self.counter['failed']
)
endtime = time.time() * 1000
self._endtime.labels(
self.name, self.backup_task.value
).set(endtime)
self._duration.labels(
self.name, self.backup_task.value
).set(
endtime - starttime
)
self._last_success.labels(
self.name, self.backup_task.value
).set(
endtime
)
return False
def backup(self) -> bool:
"""backup [summary]
:return: [description]
:rtype: bool
"""
self.backup_task = BackupTask.BACKUP
logging.debug("{}".format(self.dashboards))
starttime = time.time() * 1000
self._starttime.labels(
self.name, self.backup_task.value
).set(starttime)
logging.info(self.dashboards)
with ZipFile(self.backup_path_name, 'w') as backup_zip:
for organization in self.organizations:
self.org_id = organization['id']
root_folder = organization['name']
for dashboard in self.dashboards():
self.counter["total"] += 1
try:
full_dashboard = self.dashboard(dashboard['uid'])
dashboard_name = full_dashboard['dashboard']['title']
folder = full_dashboard['meta']['folderTitle']
path = f'{root_folder}{os.sep}{folder}' + \
f'{os.sep}{self.get_json_name(dashboard_name)}'
local_path = self.write_dashboard(
full_dashboard,
root_folder
)
backup_zip.write(
local_path,
path,
ZIP_DEFLATED
)
self.counter['success'] += 1
except Exception as e:
self.counter['failed'] += 1
logging.error(e)
shutil.rmtree(self._tmp_path)
if self.counter['failed'] > 0:
self._up.labels(
self.name, self.backup_task.value
).set(0)
return True
self._up.labels(
self.name, self.backup_task.value
).set(1)
self._backup.info(
{
"name": self.name,
"backup_task": self.backup_task.value,
"backup_name": self.backup_name,
"state": "done"
}
)
self._total.labels(
self.name, self.backup_task.value
).set(
self.counter['total']
)
self._success.labels(
self.name, self.backup_task.value
).set(
self.counter['success']
)
self._failed.labels(
self.name, self.backup_task.value
).set(
self.counter['failed']
)
endtime = time.time() * 1000
self._endtime.labels(
self.name, self.backup_task.value
).set(endtime)
self._duration.labels(
self.name, self.backup_task.value
).set(
endtime - starttime
)
self._last_success.labels(
self.name, self.backup_task.value
).set(
endtime
)
return False
def rotate(self, rotate: int = 4) -> bool:
"""rotate [summary]
:param rotate: [description], defaults to 4
:type rotate: int, optional
:return: [description]
:rtype: bool
"""
self.backup_task = BackupTask.ROTATE
starttime = time.time() * 1000
self._starttime.labels(
self.name, self.backup_task.value
).set(starttime)
for backup in sorted(self.get_backups())[:-rotate]:
try:
logging.warning(f"Delete Backup {backup}")
os.remove(
os.path.join(
self.backup_path,
backup
)
)
except Exception as e:
self._up.labels(
self.name, self.backup_task.value
).set(0)
logging.error(e)
return True
endtime = time.time() * 1000
self._up.labels(
self.name, self.backup_task.value
).set(1)
self._endtime.labels(
self.name, self.backup_task.value
).set(endtime)
self._duration.labels(
self.name, self.backup_task.value
).set(
endtime - starttime
)
self._last_success.labels(
self.name, self.backup_task.value
).set(
endtime
)
return False
def get_backups(self) -> List[str]:
return [file for file in os.listdir(self.backup_path) if file.endswith('.zip')]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment