-
-
Save cpina/e97c0da58f42a0db83b3886674de4410 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import collections | |
import re | |
import math | |
import datetime | |
import pytz | |
errors = ["Client host rejected: cannot find your reverse hostname", | |
"Client host rejected: cannot find your hostname"] | |
def process_postfix_line(log): | |
for error in errors: | |
m = re.match(rf".*{error}.*from=<(?P<from>.*)> to=<(?P<to>.*)> proto=ESMTP helo=<(?P<helo>.*)>", log) | |
if m is None: | |
continue | |
return { | |
"from": m.group("from"), | |
"to": m.group("to"), | |
"helo": m.group("helo") | |
} | |
return None | |
def process_line(line): | |
m1 = re.match( | |
r"(?P<month>...) (?P<day>.[0-9]) (?P<hour>[0-9]{2}):(?P<min>[0-9]{2}):(?P<sec>[0-9]{2}) (?P<server_hostname>[^ ]+) postfix/(?P<log>.*)", | |
line) | |
if m1 is None: | |
return None | |
log = m1.group("log") | |
log_info = process_postfix_line(log) | |
if log_info is None: | |
return None | |
today_year = datetime.datetime.today().year | |
date_str = f"{today_year} {m1.group('month')} {m1.group('day')} {m1.group('hour')}:{m1.group('min')}:{m1.group('sec')}" | |
date = datetime.datetime.strptime(date_str, "%Y %b %d %H:%M:%S") | |
date = date.replace(tzinfo=pytz.UTC) | |
return { | |
"date": date, | |
**log_info | |
} | |
def print_rejected_emails(files, recipient, hours): | |
now = datetime.datetime.now(tz=pytz.UTC) | |
print(f"# Errors for {recipient}") | |
froms = collections.defaultdict(lambda: {"count": 0, "last": ""}) | |
for file in files: | |
for line in file: | |
line = line.rstrip() | |
processed = process_line(line) | |
if processed: | |
elapsed_hours = (now - processed["date"]).total_seconds() / 3600 | |
if processed["to"] == recipient and elapsed_hours < hours: | |
date_str = f"{processed['date']:%Y-%m-%d %H:%M:%S}" | |
if processed["from"] != "": | |
line = f"from: {processed['from']}" | |
else: | |
line = f"helo: {processed['helo']}" | |
froms[line]["count"] += 1 | |
froms[line]["last"] = date_str | |
lines = [] | |
# Not very efficient, just a left-over of when not sorting | |
# (this could be done in the other loop) | |
for from_, information in froms.items(): | |
lines.append({"from": from_, **information}) | |
lines.sort(key=lambda l: l["count"], reverse=True) | |
for line in lines: | |
count_str = f"{line['count']:02d}" | |
print(f"{line['last']} ({count_str}) {line['from']}") | |
print(f"Number of different errors for {recipient} {len(froms)}") | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("recipient", help="email address") | |
parser.add_argument("files", type=argparse.FileType("r"), nargs="+", help="mail.log containing postfix errors") | |
parser.add_argument("--hours", type=int, default=math.inf, help="How many hours back") | |
args = parser.parse_args() | |
print_rejected_emails(args.files, args.recipient, args.hours) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment