Created
June 28, 2024 17:51
-
-
Save oskaralmlov/fb9cd014f75859103f00e57b84e05de2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Keeps LibreNMS groups in sync. | |
# If service templates are in use it's expected that they target groups | |
# whose names start with "z-service" and automatically re-discovers hosts | |
# that are part of these groups, if they're added / modified by the script. | |
# | |
# Config file is expected to have this format: | |
# { | |
# "groups": [ | |
# { | |
# "name": "WWW DB", | |
# "rules": {"condition": "OR", | |
# "rules": [{"field": "devices.hostname", | |
# "id": "devices.hostname", | |
# "input": "text", | |
# "operator": "regex", | |
# "type": "string", | |
# "value": "www-db.*"}]} | |
# }, | |
# { | |
# "name": "WWW Workers", | |
# "rules": {"condition": "OR", | |
# "rules": [{"field": "devices.hostname", | |
# "id": "devices.hostname", | |
# "input": "text", | |
# "operator": "regex", | |
# "type": "string", | |
# "value": "www-worker.*"}]} | |
# } | |
# ] | |
# } | |
import sys | |
import json | |
import argparse | |
try: | |
import requests | |
except ImportError: | |
sys.exit('requests package is required') | |
SERVICE_GROUP_IDENTIFIER = 'z-service' | |
class LibreNMS(requests.sessions.Session): | |
def __init__(self, hostname, api_token, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.identifier = '[librenms-sync]' | |
self.base_url = 'https://' + hostname + '/api/v0' | |
self.headers.update({'X-Auth-Token': api_token}) | |
self._verify_api_token_validity() | |
def _request(self, method, endpoint, *args, **kwargs): | |
url = self.base_url + endpoint | |
try: | |
response = self.request(method, url, *args, **kwargs) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
sys.exit(e) | |
except Exception as e: | |
sys.exit(e) | |
def _get(self, endpoint): | |
return self._request('GET', endpoint) | |
def _post(self, endpoint, *args, **kwargs): | |
return self._request('POST', endpoint, *args, **kwargs) | |
def _del(self, endpoint, *args, **kwargs): | |
return self._request('DELETE', endpoint, *args, **kwargs) | |
def _patch(self, endpoint, *args, **kwargs): | |
return self._request('PATCH', endpoint, *args, **kwargs) | |
def _verify_api_token_validity(self): | |
response = self._get('/system') | |
assert response['status'] == 'ok' | |
return | |
def get_devicegroups(self): | |
return self._get('/devicegroups') | |
def list_managed_groups(self): | |
groups = self.get_devicegroups()['groups'] | |
return [group for group in groups if group.get('desc') and self.identifier in group['desc']] | |
def add_devicegroup(self, name, rules): | |
data = { | |
'name': name, | |
'type': 'dynamic', | |
'desc': self.identifier, | |
'rules': json.dumps(rules) | |
} | |
return self._post('/devicegroups', json=data) | |
def delete_devicegroup(self, name): | |
return self._del('/devicegroups/' + name) | |
def update_devicegroup(self, name, rules): | |
data = {'rules': json.dumps(rules)} | |
return self._patch('/devicegroups/' + name, json=data) | |
def get_devices_by_group(self, name): | |
return self._get('/devicegroups/' + name) | |
def rediscover_device(self, name_or_id): | |
return self._get('/devices/' + str(name_or_id) + '/rediscover') | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-c', '--config', required=True) | |
parser.add_argument('-l', '--librenms-instance', required=True) | |
parser.add_argument('-t', '--librenms-api-token', required=True) | |
parser.add_argument('-y', '--yes', help='Do not prompt for confirmation', action='store_true') | |
return parser.parse_args() | |
def main(): | |
args = parse_args() | |
librenms = LibreNMS(args.librenms_instance, args.librenms_api_token) | |
librenms_groups = librenms.list_managed_groups() | |
librenms_groups_map = {g['name']: g for g in librenms_groups} | |
print('Found', len(librenms_groups), 'managed groups in LibreNMS\n') | |
with open(args.config) as config_file: | |
config = json.load(config_file) | |
config_groups = config['groups'] | |
groups_to_add = [cg for cg in config_groups if cg['name'] not in librenms_groups_map] | |
groups_to_del = [lg for lg in librenms_groups if lg['name'] not in [cg['name'] for cg in config_groups]] | |
groups_to_mod = [cg for cg in config_groups | |
if cg['name'] in librenms_groups_map | |
and not all([rule == librenms_groups_map[cg['name']]['rules']['rules'][idx] for idx, rule in enumerate(cg['rules']['rules'])])] | |
if not groups_to_add and not groups_to_del and not groups_to_mod: | |
print('Nothing to do') | |
sys.exit() | |
def print_group_action(action, groups): | |
if groups: | |
print(f'Found {len(groups)} groups to {action}:') | |
for group in groups: | |
print(group['name']) | |
print() | |
print_group_action('add', groups_to_add) | |
print_group_action('delete', groups_to_del) | |
print_group_action('modify', groups_to_mod) | |
answer = 'y' if args.yes else input('Continue? [Y/n]: ').strip().lower() or 'y' | |
if answer != 'y': | |
print('Aborted by user') | |
sys.exit() | |
groups_to_rediscover = set() | |
def process_groups(action, groups, method): | |
for group in groups: | |
if action == 'Deleted': | |
method(group['name']) | |
else: | |
method(group['name'], group['rules']) | |
groups_to_rediscover.add(group['name']) | |
print(f'{action} {group["name"]}') | |
process_groups('Added', groups_to_add, librenms.add_devicegroup) | |
process_groups('Deleted', groups_to_del, librenms.delete_devicegroup) | |
process_groups('Modified', groups_to_mod, librenms.update_devicegroup) | |
devices_to_rediscover = set() | |
for group_name in groups_to_rediscover: | |
if not group_name.startswith(SERVICE_GROUP_IDENTIFIER): | |
continue | |
devices = librenms.get_devices_by_group(group_name) | |
devices_to_rediscover.update(device['device_id'] for device in devices['devices']) | |
if devices_to_rediscover: | |
print(f'\nRe-discovering', len(devices_to_rediscover), | |
'devices associated with', len(groups_to_rediscover), 'service group(s):') | |
for device_id in devices_to_rediscover: | |
librenms.rediscover_device(device_id) | |
print(device_id) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment