Skip to content

Instantly share code, notes, and snippets.

@oskaralmlov
Created June 28, 2024 17:51
Show Gist options
  • Save oskaralmlov/fb9cd014f75859103f00e57b84e05de2 to your computer and use it in GitHub Desktop.
Save oskaralmlov/fb9cd014f75859103f00e57b84e05de2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# Keeps LibreNMS groups in sync.
# If service templates are in use it's expected that they target groups
# whose names start with "z-service" and automatically re-discovers hosts
# that are part of these groups, if they're added / modified by the script.
#
# Config file is expected to have this format:
# {
# "groups": [
# {
# "name": "WWW DB",
# "rules": {"condition": "OR",
# "rules": [{"field": "devices.hostname",
# "id": "devices.hostname",
# "input": "text",
# "operator": "regex",
# "type": "string",
# "value": "www-db.*"}]}
# },
# {
# "name": "WWW Workers",
# "rules": {"condition": "OR",
# "rules": [{"field": "devices.hostname",
# "id": "devices.hostname",
# "input": "text",
# "operator": "regex",
# "type": "string",
# "value": "www-worker.*"}]}
# }
# ]
# }
import sys
import json
import argparse
try:
import requests
except ImportError:
sys.exit('requests package is required')
SERVICE_GROUP_IDENTIFIER = 'z-service'
class LibreNMS(requests.sessions.Session):
def __init__(self, hostname, api_token, *args, **kwargs):
super().__init__(*args, **kwargs)
self.identifier = '[librenms-sync]'
self.base_url = 'https://' + hostname + '/api/v0'
self.headers.update({'X-Auth-Token': api_token})
self._verify_api_token_validity()
def _request(self, method, endpoint, *args, **kwargs):
url = self.base_url + endpoint
try:
response = self.request(method, url, *args, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
sys.exit(e)
except Exception as e:
sys.exit(e)
def _get(self, endpoint):
return self._request('GET', endpoint)
def _post(self, endpoint, *args, **kwargs):
return self._request('POST', endpoint, *args, **kwargs)
def _del(self, endpoint, *args, **kwargs):
return self._request('DELETE', endpoint, *args, **kwargs)
def _patch(self, endpoint, *args, **kwargs):
return self._request('PATCH', endpoint, *args, **kwargs)
def _verify_api_token_validity(self):
response = self._get('/system')
assert response['status'] == 'ok'
return
def get_devicegroups(self):
return self._get('/devicegroups')
def list_managed_groups(self):
groups = self.get_devicegroups()['groups']
return [group for group in groups if group.get('desc') and self.identifier in group['desc']]
def add_devicegroup(self, name, rules):
data = {
'name': name,
'type': 'dynamic',
'desc': self.identifier,
'rules': json.dumps(rules)
}
return self._post('/devicegroups', json=data)
def delete_devicegroup(self, name):
return self._del('/devicegroups/' + name)
def update_devicegroup(self, name, rules):
data = {'rules': json.dumps(rules)}
return self._patch('/devicegroups/' + name, json=data)
def get_devices_by_group(self, name):
return self._get('/devicegroups/' + name)
def rediscover_device(self, name_or_id):
return self._get('/devices/' + str(name_or_id) + '/rediscover')
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', required=True)
parser.add_argument('-l', '--librenms-instance', required=True)
parser.add_argument('-t', '--librenms-api-token', required=True)
parser.add_argument('-y', '--yes', help='Do not prompt for confirmation', action='store_true')
return parser.parse_args()
def main():
args = parse_args()
librenms = LibreNMS(args.librenms_instance, args.librenms_api_token)
librenms_groups = librenms.list_managed_groups()
librenms_groups_map = {g['name']: g for g in librenms_groups}
print('Found', len(librenms_groups), 'managed groups in LibreNMS\n')
with open(args.config) as config_file:
config = json.load(config_file)
config_groups = config['groups']
groups_to_add = [cg for cg in config_groups if cg['name'] not in librenms_groups_map]
groups_to_del = [lg for lg in librenms_groups if lg['name'] not in [cg['name'] for cg in config_groups]]
groups_to_mod = [cg for cg in config_groups
if cg['name'] in librenms_groups_map
and not all([rule == librenms_groups_map[cg['name']]['rules']['rules'][idx] for idx, rule in enumerate(cg['rules']['rules'])])]
if not groups_to_add and not groups_to_del and not groups_to_mod:
print('Nothing to do')
sys.exit()
def print_group_action(action, groups):
if groups:
print(f'Found {len(groups)} groups to {action}:')
for group in groups:
print(group['name'])
print()
print_group_action('add', groups_to_add)
print_group_action('delete', groups_to_del)
print_group_action('modify', groups_to_mod)
answer = 'y' if args.yes else input('Continue? [Y/n]: ').strip().lower() or 'y'
if answer != 'y':
print('Aborted by user')
sys.exit()
groups_to_rediscover = set()
def process_groups(action, groups, method):
for group in groups:
if action == 'Deleted':
method(group['name'])
else:
method(group['name'], group['rules'])
groups_to_rediscover.add(group['name'])
print(f'{action} {group["name"]}')
process_groups('Added', groups_to_add, librenms.add_devicegroup)
process_groups('Deleted', groups_to_del, librenms.delete_devicegroup)
process_groups('Modified', groups_to_mod, librenms.update_devicegroup)
devices_to_rediscover = set()
for group_name in groups_to_rediscover:
if not group_name.startswith(SERVICE_GROUP_IDENTIFIER):
continue
devices = librenms.get_devices_by_group(group_name)
devices_to_rediscover.update(device['device_id'] for device in devices['devices'])
if devices_to_rediscover:
print(f'\nRe-discovering', len(devices_to_rediscover),
'devices associated with', len(groups_to_rediscover), 'service group(s):')
for device_id in devices_to_rediscover:
librenms.rediscover_device(device_id)
print(device_id)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment