Skip to content

Instantly share code, notes, and snippets.

@cnicodeme
Last active December 8, 2017 11:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cnicodeme/58dfebd0aa6b630f6103033c39102938 to your computer and use it in GitHub Desktop.
Save cnicodeme/58dfebd0aa6b630f6103033c39102938 to your computer and use it in GitHub Desktop.
Postfix SPAM checker with deletion
# -*- config:utf-8 -*-
import socket, select, re, logging
from io import BytesIO
divider_pattern = re.compile(br'^(.*?)\r?\n(.*?)\r?\n\r?\n', re.DOTALL)
first_line_pattern = re.compile(br'^SPAMD/[^ ]+ 0 EX_OK$')
# @see https://github.com/slimta/python-slimta/blob/master/slimta/policy/spamassassin.py
class SpamAssassin(object):
def __init__(self, message, timeout=15):
self.score = None
self.symbols = None
# Connecting
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.settimeout(timeout)
client.connect(('127.0.0.1', 783))
# Sending
client.sendall(self._build_message(message))
client.shutdown(socket.SHUT_WR)
# Reading
resfp = BytesIO()
while True:
ready = select.select([client], [], [], timeout)
if ready[0] is None:
# Kill with Timeout!
logging.info('[SpamAssassin] - Timeout ({0}s)!'.format(str(timeout)))
break
data = client.recv(4096)
if data == b'':
break
resfp.write(data)
# Closing
client.close()
client = None
self._parse_response(resfp.getvalue())
def _build_message(self, message):
reqfp = BytesIO()
data_len = str(len(message)).encode()
reqfp.write(b'SYMBOLS SPAMC/1.2\r\n')
reqfp.write(b'Content-Length: ' + data_len + b'\r\n')
reqfp.write(b'User: cx42\r\n\r\n')
reqfp.write(message)
return reqfp.getvalue()
def _parse_response(self, response):
if response == b'':
logging.info("[SPAM ASSASSIN] Empty response")
return None
match = divider_pattern.match(response)
if not match:
logging.error("[SPAM ASSASSIN] Response error:")
logging.error(response)
return None
first_line = match.group(1)
headers = match.group(2)
body = response[match.end(0):]
# Checking response is good
match = first_line_pattern.match(first_line)
if not match:
logging.error("[SPAM ASSASSIN] invalid response:")
logging.error(first_line)
return None
self.symbols = [s.decode('ascii').strip() for s in body.strip().split(',')]
headers = headers.replace(' ', '').replace(':', ';').replace('/', ';').split(';')
self.score = float(headers[2])
def get_score(self):
return self.score
def get_symbols(self):
return self.symbols
def is_spam(self, level=5):
return self.score is None or self.score >= level
# -*- config:utf-8 -*-
import subprocess, json
from spamd import SpamAssassin
def main():
output = subprocess.Popen(['postqueue', '-j'], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]
items = []
deleted = 0
for line in output.replace('\r', '').strip().split('\n'):
item = json.loads(line.strip())
items.append(item)
if len(item['recipients']) != 1:
print "DIFF ONE:"
print json.dumps(item, indent=4)
# Getting the email content:
email = subprocess.Popen(['postcat', '-bhq', item['queue_id']], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]
ass = SpamAssassin(email)
if ass.is_spam():
proc = subprocess.Popen(['postsuper', '-d', item['queue_id']], stderr=subprocess.PIPE, stdout=subprocess.PIPE)
proc.wait()
print "{0} is Spam with a score of {1} - Deleting it ({2}).".format(item['queue_id'], ass.get_score(), proc.returncode)
deleted = deleted + 1
else:
print "{0} is valid with a score of {1}".format(item['queue_id'], ass.get_score())
print "Deleted : {0}/{1}".format(deleted, len(items))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment