Skip to content

Instantly share code, notes, and snippets.

@cpina
Created January 17, 2024 23:20
Show Gist options
  • Save cpina/e97c0da58f42a0db83b3886674de4410 to your computer and use it in GitHub Desktop.
Save cpina/e97c0da58f42a0db83b3886674de4410 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import collections
import re
import math
import datetime
import pytz
errors = ["Client host rejected: cannot find your reverse hostname",
"Client host rejected: cannot find your hostname"]
def process_postfix_line(log):
for error in errors:
m = re.match(rf".*{error}.*from=<(?P<from>.*)> to=<(?P<to>.*)> proto=ESMTP helo=<(?P<helo>.*)>", log)
if m is None:
continue
return {
"from": m.group("from"),
"to": m.group("to"),
"helo": m.group("helo")
}
return None
def process_line(line):
m1 = re.match(
r"(?P<month>...) (?P<day>.[0-9]) (?P<hour>[0-9]{2}):(?P<min>[0-9]{2}):(?P<sec>[0-9]{2}) (?P<server_hostname>[^ ]+) postfix/(?P<log>.*)",
line)
if m1 is None:
return None
log = m1.group("log")
log_info = process_postfix_line(log)
if log_info is None:
return None
today_year = datetime.datetime.today().year
date_str = f"{today_year} {m1.group('month')} {m1.group('day')} {m1.group('hour')}:{m1.group('min')}:{m1.group('sec')}"
date = datetime.datetime.strptime(date_str, "%Y %b %d %H:%M:%S")
date = date.replace(tzinfo=pytz.UTC)
return {
"date": date,
**log_info
}
def print_rejected_emails(files, recipient, hours):
now = datetime.datetime.now(tz=pytz.UTC)
print(f"# Errors for {recipient}")
froms = collections.defaultdict(lambda: {"count": 0, "last": ""})
for file in files:
for line in file:
line = line.rstrip()
processed = process_line(line)
if processed:
elapsed_hours = (now - processed["date"]).total_seconds() / 3600
if processed["to"] == recipient and elapsed_hours < hours:
date_str = f"{processed['date']:%Y-%m-%d %H:%M:%S}"
if processed["from"] != "":
line = f"from: {processed['from']}"
else:
line = f"helo: {processed['helo']}"
froms[line]["count"] += 1
froms[line]["last"] = date_str
lines = []
# Not very efficient, just a left-over of when not sorting
# (this could be done in the other loop)
for from_, information in froms.items():
lines.append({"from": from_, **information})
lines.sort(key=lambda l: l["count"], reverse=True)
for line in lines:
count_str = f"{line['count']:02d}"
print(f"{line['last']} ({count_str}) {line['from']}")
print(f"Number of different errors for {recipient} {len(froms)}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("recipient", help="email address")
parser.add_argument("files", type=argparse.FileType("r"), nargs="+", help="mail.log containing postfix errors")
parser.add_argument("--hours", type=int, default=math.inf, help="How many hours back")
args = parser.parse_args()
print_rejected_emails(args.files, args.recipient, args.hours)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment