Skip to content

Instantly share code, notes, and snippets.

@suenkler
Last active June 13, 2023 19:17
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save suenkler/acd1ee26d17224b9005f to your computer and use it in GitHub Desktop.
Save suenkler/acd1ee26d17224b9005f to your computer and use it in GitHub Desktop.
This Python script checks Postfix's mail queue for messages that were deferred for a defined reason, deletes these mails and notifies the sender.
# -*- coding: utf-8 -*-
"""
This Python script checks Postfix's mail queue for messages that
were deferred for a defined reason, deletes these mails and
notifies the sender.
This script uses Python 3 and is tested on Ubuntu 14.04.
Copyright (C) 2016 Hendrik Sünkler <mailbox@suenkler.info>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
Please find a copy of the GNU Affero General Public License
here: <http://www.gnu.org/licenses/>.
"""
import subprocess
import re
import sys
import smtplib
from email.header import decode_header
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
#####################################################################
# Settings
notification_sender = "mailbox@suenkler.info"
notification_subject = "Alert! Email couldn't be delivered securely and was deleted!"
# Notification is sent and message is deleted if it was queued
# for following reason:
queue_reason = "TLS is required, but was not offered"
#queue_reason = "Server certificate not trusted"
# Send notifications to internal users only!
local_domain = "@suenkler.info"
######################################################################
def send_notification(message):
sender = notification_sender
recipient = message['sender']
# Create message container - the MIME type is multipart/alternative.
msg = MIMEMultipart('alternative')
msg['Subject'] = notification_subject
msg['From'] = sender
msg['To'] = recipient
# Email content
html_content = """<html>
<body>
<p>The following message couldn't be delivered
securely:</p>
<ul>
<li>Recipient(s): {recipients}</li>
<li>Date: {date}</li>
<li>Subject: {subject}</li>
</ul>
<p>Your mail was deleted.</p>
</body>
</html>""".format(date=message['date'],
recipients=message["recipients"],
subject=message['subject'])
text_content = """The following message couldn't be delivered securely:\n
Recipient(s): {recipients}\n
Date: {date}\n
Subject: {subject}\n\n
Your mail was deleted.
""".format(date=message['date'],
recipients=message["recipients"],
subject=message['subject'])
# Record the MIME types of both parts - text/plain and text/html.
part1 = MIMEText(text_content, 'plain')
part2 = MIMEText(html_content, 'html')
# Attach parts into message container.
# According to RFC 2046, the last part of a multipart message, in this case
# the HTML message, is best and preferred.
msg.attach(part1)
msg.attach(part2)
# Send the message via local SMTP server.
s = smtplib.SMTP("localhost")
# sendmail function takes 3 args: sender's address, recipient's address
# and message to send - here it is sent as one string.
s.sendmail(sender, recipient, msg.as_string())
s.quit()
def process_postfix_queue():
messages = []
qline = ["", "", "", "", ""]
####################################################
# Process Mail Queue and get relevant data
p = subprocess.Popen(['sudo', 'mailq'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output = str(p.stdout.read(), "utf-8").splitlines()
# exit if mail queue empty
if "Mail queue is empty" in " ".join(output):
sys.exit("Mail queue is empty.")
# If mails postfix is trying to deliver mails, exit
# (reason for queueing is noted in ())
if ")" not in " ".join(output):
sys.exit("Postfix is trying to deliver mails, aborting.")
# Process mailq output
for line in output:
if re.match('^-', line):
# discard in-queue wrapper
continue
elif re.match('^[A-Z0-9]', line): # queue_id, date and sender
qline[0] = line.split(" ")[0] # queue_id
qline[1] = re.search('^\w*\s*\d*\s(.*\d{2}:\d{2}:\d{2})\s.*@.*$',
line).group(1) # date
qline[2] = line[line.rindex(" "):].strip() # sender
elif line.count(')') > 0: # status/reason for deferring
qline[3] = qline[3] + " " + line.strip() # merge to one string.
elif re.match('^\s', line): # recipient/s
qline[4] = (qline[4] + " " + line.lstrip()).strip()
elif not line: # empty line to recognise the end of a record
messages.append({"queue_id": qline[0],
"date": qline[1],
"sender": qline[2],
"reasons": qline[3],
"recipients": qline[4]})
qline = ["", "", "", "", ""]
else: # should never get here
print(" ERROR: unknown input: \"" + line + "\"")
####################################################
# Send email notifications
for message in messages:
# Send out a notification if
# - queue reason as specified above
# - sender is an internal user
#
# Explanation of the second rule:
# I'm not sure if incoming messages would also be queued here
# if the next internal hop does not offer TLS. So by checking
# the sender I make sure that I do not send notifications
# to external senders of incoming mail.
if queue_reason in message["reasons"] \
and local_domain in message["sender"]:
###################################################################
# Get the subject of the mail
# TODO: Use python instead of grep
p1 = subprocess.Popen(['sudo', 'postcat', '-qh',
message['queue_id']],
stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', '^Subject: '],
stdin=p1.stdout,
stdout=subprocess.PIPE)
p1.stdout.close()
subjectlist = decode_header(p2.communicate()[0].decode("utf-8"))
# Let's construct the subject line:
subject = ""
# Subjects with 'Umlauts' consist of multiple list items:
if len(subjectlist) > 1:
for item in subjectlist:
if item[1] is None:
subject += item[0].decode('utf-8')
else:
subject += item[0].decode(item[1])
else:
subject += str(subjectlist[0][0])
# Remove the string 'Subject: '
subject = subject.replace("Subject: ", "")
# Set the subject
message['subject'] = str(subject)
# Now delete the mail from the queue
try:
p = subprocess.Popen(["sudo", "postsuper", "-d",
message['queue_id']],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output = str(p.stdout.read(), "utf-8")
except:
sys.exit("Error deleting mail in queue.")
# Make sure that the mail was deleted successfully from
# the queue before sending out notification mail.
if "postsuper: Deleted: 1 message" in output:
try:
send_notification(message)
m = "Message with queue ID {queue_id} was deleted " \
+ "from queue and " \
+ "notification message was sent."
m = m.format(queue_id=message['queue_id'])
print(m)
except:
sys.exit("Error sending notification message.")
else:
sys.exit("Error deleting message in queue.")
else:
m = "Message with queue ID {queue_id} was *not* deleted " \
+ "because the queue reason does not match."
m = m.format(queue_id=message['queue_id'])
print(m)
if __name__ == '__main__':
process_postfix_queue()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment