Skip to content

Instantly share code, notes, and snippets.

@d--j
Created November 3, 2014 15:30
Show Gist options
  • Save d--j/317b28a5fb14ac89227f to your computer and use it in GitHub Desktop.
Save d--j/317b28a5fb14ac89227f to your computer and use it in GitHub Desktop.
'''
Manages Icinga Passive Check delivery
'''
import logging
import logging.handlers
import os
import signal
LOG_LEVELS = (logging.WARNING, logging.INFO, logging.DEBUG)
global LOG_LEVEL_INDEX
LOG_LEVEL_INDEX = 0
# logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s')
logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(threadName)s %(message)s")
log = logging.getLogger('icinga_daemon')
log.setLevel(LOG_LEVELS[LOG_LEVEL_INDEX])
def switch_loglevel(signum, stack):
try:
global LOG_LEVEL_INDEX
current_log_index = LOG_LEVEL_INDEX
current_log_index += 1
if current_log_index >= len(LOG_LEVELS):
current_log_index = 0
LOG_LEVEL_INDEX = current_log_index
# print "new log level %d, %d" % (LOG_LEVEL_INDEX, LOG_LEVELS[LOG_LEVEL_INDEX])
except Exception, e:
print "exception %s in signal handler for %d, stack = %s" % (e, signum, stack)
signal.signal(signal.SIGUSR1, switch_loglevel)
# log.setLevel(logging.DEBUG)
# syslog_handler = logging.handlers.SysLogHandler(address='/dev/log')
# syslog_handler.setFormatter(logging.Formatter(fmt="%s[%d]: %%(message)s" % ('icinga_daemon', os.getpid())))
# syslog_handler.setLevel(logging.WARNING)
# log.addHandler(syslog_handler)
import sys
import time
import salt.config
import salt.client
import salt.runner
import salt.utils.event
import threading
import copy
import datetime
import heapq
import random
REPORT_INTERVAL = 30 # every 30 seconds
REFRESH_MINONS_INTERVAL = 60 * 10 # every 10 minutes
MINON_DISCOVERY_INTERVAL = 60 * 60 * 6 # every 6 hours
ICINGA_CONFIG_CREATOR_INITIAL_DELAY = 50
MINION_CHECK_INITIAL_DELAY_LOW = 75
MINION_CHECK_INITIAL_DELAY_HIGH = 240
DISCOVERY_IDENTIFIER = '__discovery__'
ICINGA_HOST = 'your.icinga.minion.id'
def get_interval(check_dict):
seconds = 0
seconds += int(check_dict.get('seconds', 0))
seconds += int(check_dict.get('minutes', 0)) * 60
seconds += int(check_dict.get('hours', 0)) * 3600
seconds += int(check_dict.get('days', 0)) * 86400
if seconds == 0:
seconds = 600 # 10 Minutes is default
if seconds < REPORT_INTERVAL:
seconds = REPORT_INTERVAL # < REPORT_INTERVAL seconds interval doesn't makes sense
return seconds
class Task:
"""One repeating thing that can be scheduled"""
def __init__(self, identifier, interval, start_delay=0.0):
self.identifier = identifier
self.interval = interval
self.next_execution = time.time() + start_delay
self.enabled = False
def __repr__(self):
return '<Task {0} interval={1}>'.format(self.identifier, self.interval)
def log_info(self, other_tasks=[]):
return ""
def get_first_sort_key(self):
return int(self.next_execution) / 5 * 5 # 5 seconds accuracy
def get_secondary_sort_key(self):
return ''
def enable(self):
self.enabled = True
def disable(self):
self.enabled = False
def needs_rescheduleing(self, now=False, forced_reshedule_time=None):
if self.enabled:
if forced_reshedule_time:
self.next_execution = forced_reshedule_time
elif now:
self.next_execution = time.time()
else:
self.next_execution = time.time() + self.interval
return True
else:
return False
def perform(self, adapter):
pass
def combinable_with(self, other_tasks):
return False
def perform_combined(self, adapter, other_tasks):
log.warn('Try to perform task {0} combined with {1} other tasks. But this task is not combinable!'.format(self, len(other_tasks)))
self.perform(adapter)
for t in other_tasks:
t.perform(adapter)
class Debugger(Task):
"""Outputs internal data"""
def __init__(self):
Task.__init__(self, 'debugger', REPORT_INTERVAL * 10)
def log_info(self, other_tasks=[]):
return ''
def perform(self, adapter):
def copy_queue(shared_state):
qlist = copy.copy(shared_state.heap)
return qlist
queue_as_list = sorted(adapter.shared_state.do_locked(copy_queue))
log.debug(Debugger.format_queue(queue_as_list))
@staticmethod
def format_queue(queue_as_list):
ret = ''
for i in queue_as_list:
ret += "{0} | {1:<30} | {2}\n".format(datetime.datetime.fromtimestamp(i[0]).strftime('%Y-%m-%d %H:%M:%S'), i[1], i[2].identifier)
return ret
class Reporter(Task):
"""Reports checks to Icinga"""
results = []
results_lock = threading.RLock()
def __init__(self):
Task.__init__(self, 'reporter', REPORT_INTERVAL)
def log_info(self, other_tasks=[]):
return 'lines={0}'.format(len(Reporter.results))
def perform(self, adapter):
with Reporter.results_lock:
lines = Reporter.results
Reporter.results = []
if len(lines) > 0:
# result = self.adapter.client.cmd((ICINGA_HOST,), 'file.append', ['/var/lib/icinga/rw/icinga.cmd'] + lines, expr_form='list')
result = adapter.client.cmd((ICINGA_HOST,), 'cmd.run', ['cat >> /var/lib/icinga/rw/icinga.cmd'], kwarg=dict(stdin="\n".join(lines)), expr_form='list')
if ICINGA_HOST not in result:
log.error("Reporter: Icinga minon down? {0}".format(result))
with Reporter.results_lock:
Reporter.results.extend(lines)
# TODO: only reappend the last X lines to not trash the memory when the monitor host if offline for a long time
@staticmethod
def add_result(line):
with Reporter.results_lock:
Reporter.results.append(line)
class RefreshMinions(Task):
"""Refreshes available Minions and create MinionDiscovery tasks for new Minions"""
def __init__(self):
Task.__init__(self, 'refresh_minions', REFRESH_MINONS_INTERVAL)
def perform(self, adapter):
status = adapter.get_status_of_minions()
if status['down']:
for minion in sorted(status['down']):
if adapter.knows_minion(minion):
log.error("Minion {0} is down (was up)".format(minion))
adapter.mark_minion_down(minion)
else:
log.error("Minion {0} is down (never seen up before)".format(minion))
if status['up']:
for minion in sorted(status['up']):
if not adapter.knows_minion(minion):
log.warning(" Minion {0} is up".format(minion))
minion_task = MinionDiscovery(minion)
adapter.add_task(minion_task)
adapter.mark_minion_up(minion)
class MinionTask(Task):
"""A task for a specific minion"""
def __init__(self, identifier, interval, minion, start_delay=0.0):
Task.__init__(self, identifier, interval, start_delay)
self.minion = minion
class MinionDiscovery(MinionTask):
"""Discovers passive checks on a single minion"""
def __init__(self, minion):
MinionTask.__init__(self, '%s::%s' % (minion, DISCOVERY_IDENTIFIER), MINON_DISCOVERY_INTERVAL, minion)
def log_info(self, other_tasks=[]):
return self.minion
def perform(self, adapter):
checks = adapter.get_config_of_minion(self.minion)
old_checks = adapter.get_minion_checks(self.minion)
if checks:
check_names = [c[0] for c in checks]
checks = dict(checks)
remaining_checks = list(set(old_checks) & set(check_names))
removed_checks = list(set(old_checks) - set(check_names))
new_checks = list(set(check_names) - set(old_checks))
log.debug('Discovery result for {0}: new_checks = {1}, removed_checks = {2}, remaining_checks = {3}'.format(self.minion, new_checks, removed_checks, remaining_checks))
adapter.update_minion_checks(self.minion, new_checks, removed_checks, checks)
else:
log.warn('Discovery task of {0} disables itself because of communication error (minion down?)'.format(self.minion))
self.disable()
class MinionCheck(MinionTask):
"""MinionCheck"""
def __init__(self, minion, check_name, check):
MinionTask.__init__(self, '%s::%s' % (minion, check_name), get_interval(check), minion, 90.0)
if 'retry_interval' in check:
if isinstance(check['retry_interval'], dict):
self.retry_interval = get_interval(check['retry_interval'])
else:
self.retry_interval = check['retry_interval']
else:
self.retry_interval = self.interval
self.success_interval = self.interval
self.check_name = check_name
# self.check = check
def get_secondary_sort_key(self):
return self.minion
def set_next_interval(self, ret):
if 'retcode' in ret:
if ret['retcode'] == 0 or ret['retcode'] == 99:
self.interval = self.success_interval
else:
self.interval = self.retry_interval
def perform(self, adapter):
ret = adapter.execute_passive_checks_on_minion(self.minion, [self.check_name])
self.set_next_interval(ret.get(self.check_name, {}))
def combinable_with(self, other_tasks):
for other_task in other_tasks:
if not isinstance(other_task, MinionCheck) or other_task.minion != self.minion:
return False
return True
def perform_combined(self, adapter, other_tasks):
check_names = [self.check_name]
for other_task in other_tasks:
check_names.append(other_task.check_name)
ret = adapter.execute_passive_checks_on_minion(self.minion, check_names)
self.set_next_interval(ret.get(self.check_name, {}))
for other_task in other_tasks:
other_task.set_next_interval(ret.get(other_task.check_name, {}))
def log_info(self, other_tasks=[]):
checks = sorted([t.check_name for t in [self] + other_tasks])
return '{0:<34} -> [{2:0>2}] {1}'.format(self.minion, ', '.join(checks), len(checks) )
class IcingaConfigCreator(Task):
"""Creates Icinga configuration files"""
def __init__(self):
Task.__init__(self, 'create_icinga_config', 100 )
def perform(self, adapter):
self.disable()
adapter.remove_task(self)
adapter.create_icinga_config()
class SharedState:
"""Shared state of all workers"""
def __init__(self):
self.lock = threading.RLock()
self.heap = []
self.tasks = {}
self.minion_checks = {}
self.minion_spread = {}
refresh_task = RefreshMinions()
self.add_task(refresh_task)
reporter_task = Reporter()
self.add_task(reporter_task)
self.add_task(Debugger())
def get_minion_spread(self, minion):
with self.lock:
if minion not in self.minion_spread:
self.minion_spread[minion] = random.randrange(MINION_CHECK_INITIAL_DELAY_LOW, MINION_CHECK_INITIAL_DELAY_HIGH)
return self.minion_spread[minion]
def has_task_identifier(self, task_identifier):
with self.lock:
return self.tasks.get(task_identifier, False) and True
def has_task_like(self, task, only_enabled = False):
with self.lock:
has_task = self.tasks.get(task.identifier, False) and True
if only_enabled:
has_task = has_task and self.tasks[task.identifier].enabled
return has_task
def is_enabled(self, task_identifier):
with self.lock:
return self.tasks.get(task_identifier, False) and self.tasks[task_identifier].enabled
def add_task(self, task, only_once = False, forced_reshedule_time = None):
with self.lock:
if only_once and self.has_task_like(task, only_enabled=True):
return False
task.enable()
log.debug('Add Task {0}'.format(task.identifier))
if self.has_task_like(task):
log.debug(' already have a task like this, remove the old one')
self.remove_task(self.tasks[task.identifier])
self.tasks[task.identifier] = task
if isinstance(task, MinionCheck):
if not self.minion_checks.get(task.minion, False):
self.minion_checks[task.minion] = [ task.check_name ]
else:
self.minion_checks[task.minion].append(task.check_name)
if forced_reshedule_time:
self.reschedule_task(task, forced_reshedule_time=forced_reshedule_time)
else:
self.reschedule_task(task, now=True)
return True
def remove_task(self, task):
with self.lock:
log.debug('Remove Task {0}'.format(task.identifier))
task.disable()
if self.tasks.get(task.identifier, False):
del self.tasks[task.identifier]
if isinstance(task, MinionCheck) and self.minion_checks.get(task.minion, False):
self.minion_checks[task.minion].remove(task.check_name)
self._remove_from_heap(task.identifier)
def remove_task_by_identifier(self, task_identifier):
with self.lock:
log.debug('Remove Task {0}'.format(task_identifier))
if self.tasks.get(task_identifier, False):
task = self.tasks[task.identifier]
task.disable()
if isinstance(task, MinionCheck) and self.minion_checks.get(task.minion, False):
self.minion_checks[task.minion].remove(task.check_name)
del self.tasks[task.identifier]
self._remove_from_heap(task.identifier)
def _remove_from_heap(self, task_identifier):
with self.lock:
self.heap[:] = [x for x in self.heap if not x[2].identifier == task_identifier]
heapq.heapify(self.heap)
def _queue_task(self, task):
with self.lock:
heapq.heappush(self.heap, [task.get_first_sort_key(), task.get_secondary_sort_key(), task,])
def reschedule_task(self, task, now=False, forced_reshedule_time = None):
if task.needs_rescheduleing(now=now, forced_reshedule_time = forced_reshedule_time):
self._queue_task(task)
def _get_smallest_enabled_queue_item(self):
with self.lock:
try:
item = heapq.heappop(self.heap)
while not item[2].enabled:
item = heapq.heappop(self.heap)
return item
except IndexError, e:
return (None, None, None)
def _get_next_scheduled_task(self):
check_time, secondary_sort_key, task = self._get_smallest_enabled_queue_item()
if check_time:
now = time.time()
if check_time <= now:
return task
else:
with self.lock:
heapq.heappush(self.heap, [check_time, secondary_sort_key, task,])
return None
else:
return None
def _get_combinable_tasks(self, task):
other_tasks = []
last_poped_item = self._get_smallest_enabled_queue_item()
while last_poped_item[2]:
if task.combinable_with(other_tasks + [last_poped_item[2]]):
other_tasks.append(last_poped_item[2])
else:
with self.lock:
heapq.heappush(self.heap, last_poped_item)
return other_tasks
last_poped_item = self._get_smallest_enabled_queue_item()
return other_tasks
def do_locked(self, f):
with self.lock:
return f(self)
def get_minion_checks(self, minion):
with self.lock:
return self.minion_checks.get(minion, [])
class SaltAdapter:
"""Encapsulates all salt stuff"""
def __init__(self, shared_state, client, caller, runner):
self.shared_state = shared_state
self.client = client
self.caller = caller
self.runner = runner
def has_task_identifier(self, task_identifier):
return self.shared_state.has_task_identifier(task_identifier)
def has_task_like(self, task, **args):
return self.shared_state.has_task_like(task, **args)
def is_enabled(self, task_identifier):
return self.shared_state.is_enabled(task_identifier)
def add_task(self, task, **args):
return self.shared_state.add_task(task, **args)
def remove_task(self, task):
return self.shared_state.remove_task(task)
def remove_task_by_identifier(self, task_identifier):
return self.shared_state.remove_task_by_identifier(task_identifier)
def reschedule_task(self, task, now=False):
return self.shared_state.reschedule_task(task, now)
def get_minion_checks(self, minion):
return self.shared_state.get_minion_checks(minion)
def get_status_of_minions(self):
return self.runner.cmd('manage.status', [False])
def get_config_of_minion(self, minion):
minion_dict = self.client.cmd((minion,), ['nagios_plugin.get_config', 'nagios_plugin.all_runable'], [[],[]], expr_form='list', timeout=25)
if minion not in minion_dict:
log.error("could not get config for {0}".format(minion))
return None
monitor_checks = minion_dict[minion]['nagios_plugin.get_config']
runnable_checks = minion_dict[minion]['nagios_plugin.all_runable']
if isinstance(runnable_checks, basestring):
log.error("Wrong runnable checks from {0}: {1}".format(minion, runnable_checks))
return None
else:
checks = [(check_name, monitor_checks[check_name],) for check_name in runnable_checks]
log.debug("Minion %s got checks: %s", minion, [check_name for check_name in runnable_checks])
return checks
def create_icinga_config(self):
return self.runner.cmd('icinga.create_config', [])
def schedule_icinga_reconfiguration(self):
create_config_task = IcingaConfigCreator()
if not self.add_task(create_config_task, only_once = True, forced_reshedule_time = time.time() + ICINGA_CONFIG_CREATOR_INITIAL_DELAY):
log.debug("Already have an IcingaConfigCreator task")
def execute_passive_checks_on_minion(self, minion, check_names):
results = self.client.cmd((minion,), 'nagios_plugin.run_configs', check_names, expr_form='list', timeout= min(30, 10 * len(check_names)))
ret = {}
if minion in results and isinstance(results[minion], list):
results = results[minion]
for result in results:
if isinstance(result, dict):
ret[result['service']] = result
if result.get('action') == 'check':
Reporter.add_result(result['result'])
else:
log.error("Minion {0} had an error executing nagios check {1}: {2}".format(minion, result.get('service', '<unknown>'), result))
else:
log.error("Minion {0} had an error executing a nagios check ({2}) {1}".format(minion, result, check_names))
else:
log.error("Minion {0} had an error executing nagios checks ({2}): {1}".format(minion, results, check_names))
return ret
def perform_next_task(self):
def f(shared_state):
task = shared_state._get_next_scheduled_task()
if task:
other_tasks = shared_state._get_combinable_tasks(task)
return task, other_tasks
else:
return None, ()
task, other_tasks = self.shared_state.do_locked(f)
if task:
start = time.time()
start_delta = float(start - task.get_first_sort_key())
log.info(
"> {cls:<16} {log_info}".format(
cls = task.__class__.__name__,
log_info = task.log_info(other_tasks),
)
)
try:
if len(other_tasks) > 0:
task.perform_combined(self, other_tasks)
else:
task.perform(self)
except Exception, e:
log.exception("Caught exception while performing task {0}".format(task))
finally:
end = time.time()
log.info(
"< {cls:<16} executed in {time: >+9.4}s, start delta {delta: >+9.4}s".format(
cls = task.__class__.__name__,
time = float(end - start),
delta = start_delta,
)
)
self.reschedule_task(task)
for other_task in other_tasks:
self.reschedule_task(other_task)
else:
# log.debug("No task to execute (queue size = {0})".format(self.queue.qsize()))
time.sleep(0.35) # don't burn the CPU
def knows_minion(self, minion):
discovery_identifier = '%s::%s' % (minion, DISCOVERY_IDENTIFIER)
return self.has_task_identifier(discovery_identifier)
def mark_minion_down(self, minion):
discovery_identifier = '%s::%s' % (minion, DISCOVERY_IDENTIFIER)
def f(shared_state):
if shared_state.tasks.get(discovery_identifier, False):
shared_state.tasks[discovery_identifier].disable()
minion_checks = shared_state.minion_checks.get(minion, [])[:]
for check_name in minion_checks:
check_identifier = '%s::%s' % (minion, check_name)
if shared_state.tasks.get(check_identifier, False):
check_task = shared_state.tasks[check_identifier]
shared_state.remove_task(check_task)
self.shared_state.do_locked(f)
def mark_minion_up(self, minion):
discovery_identifier = '%s::%s' % (minion, DISCOVERY_IDENTIFIER)
def f(shared_state):
if shared_state.tasks.get(discovery_identifier, False):
discovery_task = shared_state.tasks[discovery_identifier]
if not discovery_task.enabled:
discovery_task.enable()
shared_state.reschedule_task(discovery_task, now=True)
self.shared_state.do_locked(f)
def update_minion_checks(self, minion, new_checks, removed_checks, checks):
def f(shared_state):
check_time = time.time() + shared_state.get_minion_spread(minion)
if len(new_checks) > 0 or len(removed_checks) > 0:
self.schedule_icinga_reconfiguration()
for check_name in new_checks:
check_task = MinionCheck(minion, check_name, checks[check_name])
shared_state.add_task(check_task, forced_reshedule_time = check_time)
for check_name in removed_checks:
shared_state.remove_task_by_identifier('%s::%s' % (minion, check_name))
self.shared_state.do_locked(f)
def perform_work(shared_state):
client = salt.client.LocalClient()
caller = salt.client.Caller()
opts = salt.config.master_config('/etc/salt/master')
runner = salt.runner.RunnerClient(opts)
adapter = SaltAdapter(shared_state, client, caller, runner)
while True:
adapter.perform_next_task()
def optimizer(shared_state):
"""Reorders the tasks so that they are combinable again"""
REORDER_WINDOW = 5 * 60 # 5 minutes
def optimize(shared_state):
start = time.time()
number_of_changes = 0
map_of_secondary_sort_keys = {}
sorted_array = sorted(shared_state.heap)
log.info("> Run Optimizer")
log.debug(Debugger.format_queue(sorted_array))
for item in sorted_array:
if not item[1] in map_of_secondary_sort_keys:
map_of_secondary_sort_keys[item[1]] = []
map_of_secondary_sort_keys[item[1]].append(item)
for secondary_sort_key in map_of_secondary_sort_keys:
a = sorted(map_of_secondary_sort_keys[secondary_sort_key])
current_timestamp = None
for item in a:
if item[1]:
if not current_timestamp:
current_timestamp = item[0]
if abs(item[0] - current_timestamp) < REORDER_WINDOW:
if current_timestamp != item[0]:
item[0] = current_timestamp
number_of_changes += 1
else:
current_timestamp = item[0]
heapq.heapify(sorted_array)
shared_state.heap = sorted_array
copy_of_sorted_array = sorted(sorted_array)
end = time.time()
log.info("< run time {0: >+9.4}s, did {1} number of changes".format(float(end - start), number_of_changes))
log.debug(Debugger.format_queue(copy_of_sorted_array))
while True:
time.sleep(REORDER_WINDOW)
shared_state.do_locked(optimize)
time.sleep(60 * 60 - REORDER_WINDOW) # repeat every hour
def reactor(shared_state):
"""This makes deadlocks - do not instantiate"""
while True:
try:
time.sleep(5)
event = salt.utils.event.MasterEvent('/var/run/salt/master')
data = event.get_event(tag='salt/presence/change')
if data:
log.error("reactor got notified about presence change, start refresh minions run {0}".format(data))
shared_state.add_task(RefreshMinions())
except Exception, e:
log.error("reactor error - is the master running? {0}".format(e))
def main():
log.warning("Start Icinga Passive Daemon")
shared_state = SharedState()
processes = []
# processes.append(threading.Thread(target=reactor, name="Reactor", args=(shared_state,)))
for x in xrange(1, 5):
processes.append(threading.Thread(target=perform_work, name="Worker%s"%x, args=(shared_state,)))
processes.append(threading.Thread(target=optimizer, name="Optimzer", args=(shared_state,)))
for p in processes:
p.start()
try:
current_log_index = LOG_LEVEL_INDEX
while True:
time.sleep(5)
if current_log_index != LOG_LEVEL_INDEX:
current_log_index = LOG_LEVEL_INDEX
log.setLevel(LOG_LEVELS[current_log_index])
log.warning(
"new log index is %d, log level is %s (%d) (%s)",
current_log_index,
logging.getLevelName(LOG_LEVELS[current_log_index]),
LOG_LEVELS[current_log_index],
LOG_LEVELS
)
except KeyboardInterrupt, SystemExit:
# TODO: save last_successful_index in a file
log.warning("Exit Icinga Passive Daemon")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment