Skip to content

Instantly share code, notes, and snippets.

@nathants
Last active August 20, 2022 08:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nathants/8f73f65b1545d83dd9c434e05a04b071 to your computer and use it in GitHub Desktop.
Save nathants/8f73f65b1545d83dd9c434e05a04b071 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# The MIT License (MIT)
# Copyright (c) 2022-present Nathan Todd-Stone
# https://en.wikipedia.org/wiki/MIT_License#License_terms
import functools
import sys
import time
import collections
import threading
import queue
email_to = '_@_.com'
email_subject = 'error'
identical_max_emails = 5
identical_window_minutes = 10
global_max_emails = 30
global_window_minutes = 10
def email(to, subject, text, frm):
pass # send mail, or take other action
@functools.lru_cache(maxsize=128)
def hits_map(hash):
return {'value': 0, 'time': time.time()}
sends = collections.deque([], global_max_emails)
def run_thread(fn, *a, **kw):
obj = threading.Thread(target=fn, args=a, kwargs=kw)
obj.daemon = True
obj.start()
def reader(stdin_queue):
for line in sys.stdin:
stdin_queue.put(line)
def batcher(stdin_queue, batch_queue):
"""
zip the stream with itself, so the queue is pairs of (current, next).
when there is nothing next, flush with (current, None).
"""
last_line = None
while True:
if last_line is None:
next_line = stdin_queue.get()
else:
try:
next_line = stdin_queue.get(timeout=1)
except queue.Empty:
next_line = None
if last_line:
batch_queue.put([last_line, next_line])
last_line = next_line
def main(*tokens):
"""
send an email for any log message containing a token word. treat
messages as any line starting without a space, and all subsequent
lines that start with a space.
usage: tail -F /var/log/*.log | python3 log_watcher.py error exception traceback fatal
"""
stdin_queue = queue.Queue(1)
batch_queue = queue.Queue(1)
run_thread(reader, stdin_queue)
run_thread(batcher, stdin_queue, batch_queue)
log_file = None
while True:
msg = ''
while True:
current_line, next_line = batch_queue.get()
if current_line.startswith('==>') and current_line.endswith('<==\n'): # grab logfile name from tail if tailing multiple files
log_file = current_line
else:
msg += current_line
if next_line is None or not next_line.startswith(' '):
break
msg_lower = msg.lower()
if any(token in msg_lower for token in tokens):
hits = hits_map(msg)
now = time.time()
if now - hits['time'] > identical_window_minutes * 60:
hits['value'] = 0
if hits['value'] < identical_max_emails and len([x for x in sends if now - x < global_window_minutes * 60]) < global_max_emails:
hits['value'] += 1
hits['time'] = now
sends.append(now)
subject = email_subject
if hits['value'] == identical_max_emails:
subject += ' [rate limited]'
if log_file:
msg = log_file + msg
email(email_to, subject, msg)
if __name__ == '__main__':
main(*sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment