#!/usr/bin/env python3 | |
import functools | |
import sys | |
import time | |
import collections | |
import threading | |
import queue | |
email_to = '_@_.com' | |
email_subject = 'error' | |
identical_max_emails = 5 | |
identical_window_minutes = 10 | |
global_max_emails = 30 | |
global_window_minutes = 10 | |
def email(to, subject, text, frm): | |
pass # send mail, or take other action | |
@functools.lru_cache(maxsize=128) | |
def hits_map(hash): | |
return {'value': 0, 'time': time.time()} | |
sends = collections.deque([], global_max_emails) | |
def run_thread(fn, *a, **kw): | |
obj = threading.Thread(target=fn, args=a, kwargs=kw) | |
obj.daemon = True | |
obj.start() | |
def reader(stdin_queue): | |
for line in sys.stdin: | |
stdin_queue.put(line) | |
def batcher(stdin_queue, batch_queue): | |
""" | |
zip the stream with itself, so the queue is pairs of (current, next). | |
when there is nothing next, flush with (current, None). | |
""" | |
last_line = None | |
while True: | |
if last_line is None: | |
next_line = stdin_queue.get() | |
else: | |
try: | |
next_line = stdin_queue.get(timeout=1) | |
except queue.Empty: | |
next_line = None | |
if last_line: | |
batch_queue.put([last_line, next_line]) | |
last_line = next_line | |
def main(*tokens): | |
""" | |
send an email for any log message containing a token word. treat | |
messages as any line starting without a space, and all subsequent | |
lines that start with a space. | |
usage: tail -F /var/log/*.log | python3 log_watcher.py error exception traceback fatal | |
""" | |
stdin_queue = queue.Queue(1) | |
batch_queue = queue.Queue(1) | |
run_thread(reader, stdin_queue) | |
run_thread(batcher, stdin_queue, batch_queue) | |
log_file = None | |
while True: | |
msg = '' | |
while True: | |
current_line, next_line = batch_queue.get() | |
if current_line.startswith('==>') and current_line.endswith('<==\n'): # grab logfile name from tail if tailing multiple files | |
log_file = current_line | |
else: | |
msg += current_line | |
if next_line is None or not next_line.startswith(' '): | |
break | |
msg_lower = msg.lower() | |
if any(token in msg_lower for token in tokens): | |
hits = hits_map(msg) | |
now = time.time() | |
if now - hits['time'] > identical_window_minutes * 60: | |
hits['value'] = 0 | |
if hits['value'] < identical_max_emails and len([x for x in sends if now - x < global_window_minutes * 60]) < global_max_emails: | |
hits['value'] += 1 | |
hits['time'] = now | |
sends.append(now) | |
subject = email_subject | |
if hits['value'] == identical_max_emails: | |
subject += ' [rate limited]' | |
if log_file: | |
msg = log_file + msg | |
email(email_to, subject, msg) | |
if __name__ == '__main__': | |
main(*sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment