Skip to content

Instantly share code, notes, and snippets.

@pannal
Last active March 22, 2019 21:11
Show Gist options
  • Save pannal/ff8066e272e2ecd42621894f6c843dce to your computer and use it in GitHub Desktop.
Save pannal/ff8066e272e2ecd42621894f6c843dce to your computer and use it in GitHub Desktop.
Rspamd learn with automatic sender-whitelist
#!/usr/bin/python3
import sys
import subprocess
import json
import logging
import argparse
import email
import mailparser
_LOG_LEVEL_STRINGS = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']
def _log_level_string_to_int(log_level_string):
if not log_level_string in _LOG_LEVEL_STRINGS:
message = 'invalid choice: {0} (choose from {1})'.format(log_level_string, _LOG_LEVEL_STRINGS)
raise argparse.ArgumentTypeError(message)
log_level_int = getattr(logging, log_level_string, logging.INFO)
# check the logging log_level_choices have not changed from our expected values
assert isinstance(log_level_int, int)
return log_level_int
WL_PATH_DEF = "/etc/rspamd/local.d/whitelist.txt"
RC_PATH_DEF = "/usr/bin/rspamc"
parser = argparse.ArgumentParser(description="""Learn messages via rspamc and manage a sender whitelist. Depends on https://github.com/SpamScope/mail-parser.\n\nUse with local.d/multimap.conf:\nSENDER_FROM_WHITELIST {{
type = "from";
map = "file://{}"; # default; set this via -w/--whitelist-path
prefilter = true;
action = "accept";
filter = "email"; # use "email:domain" for --use-domains mode
}}""".format(WL_PATH_DEF), formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("message_class", help="What to classify the message as: 'ham' or 'spam'")
parser.add_argument("--input", "-i", nargs='?', type=argparse.FileType('r'), default=sys.stdin, help="Mail message input; read from stdin by default")
parser.add_argument("-d", "--use-domains", help="Use domains instead of the full e-mail addresses inside the whitelist", default=False, action="store_true")
parser.add_argument("-w", "--whitelist-path", help="Path to whitelist file *NEEDS TO BE WRITABLE BY THE CURRENT USER*; Default: {}".format(WL_PATH_DEF), default=WL_PATH_DEF)
parser.add_argument("-r", "--rspamc-path", help="Path to rspamc binary; Default: {}".format(RC_PATH_DEF), default=RC_PATH_DEF)
parser.add_argument("--log-file", help="Path to log file; Default: stdout", default=None)
parser.add_argument('--log-level',
default='INFO',
dest='log_level',
type=_log_level_string_to_int,
nargs='?',
help='Set the logging output level. {0}; Default: INFO'.format(_LOG_LEVEL_STRINGS))
parser.add_argument('--mailparse-log-level',
default='WARNING',
dest='mp_log_level',
type=_log_level_string_to_int,
nargs='?',
help='Set the mailparse logging output level. {0}; Default: INFO'.format(_LOG_LEVEL_STRINGS))
log = logging.getLogger()
if __name__ == "__main__":
# parse argv
args = parser.parse_args(args=None if sys.argv[1:] else ['--help'])
# set up logging
formatter = logging.Formatter('%(asctime)s:%(levelname)s: %(message)s')
if args.log_file:
fileHandler = logging.FileHandler("{}".format(args.log_file))
fileHandler.setFormatter(formatter)
log.addHandler(fileHandler)
else:
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
log.addHandler(consoleHandler)
log.setLevel(args.log_level)
logging.getLogger("mailparser").setLevel(args.mp_log_level)
log.debug("Called rspamd_learn.py")
# main
try:
cls = args.message_class
if cls not in ("spam", "ham"):
raise ValueError("First argument must be 'ham' or 'spam'")
what = args.input.read()
if not what:
raise ValueError("Either pass the to-be-processed message as stdin or via -i/--input")
# use mailparser to get sender addresses
mail = mailparser.parse_from_string(what)
from_lines = mail.from_[:]
for k in ("return_path", "envelope_from", "sender", "x_mail_from"):
val = getattr(mail, k)
if not val:
continue
if not isinstance(val, list):
val = [val]
from_lines += email.utils.getaddresses(val)
# parse current whitelist and update it according to the current message
with open(args.whitelist_path, "r+", encoding="utf-8") as f:
# this might be a tad naive
whitelist_orig = f.read().split()
whitelist = whitelist_orig[:]
for omit, addr in list(set(from_lines)):
addr_or_domain = addr
if args.use_domains:
addr_or_domain = addr.split("@")[1]
if cls == "spam" and addr_or_domain in whitelist:
action = "remove"
elif cls == "ham" and addr_or_domain not in whitelist:
action = "append"
else:
log.debug("whitelist: {} already marked as {}".format(addr_or_domain, cls))
continue
log.info("{}: {}".format(cls, addr_or_domain))
getattr(whitelist, action)(addr_or_domain)
if whitelist_orig != whitelist:
f.truncate(0)
f.seek(0)
f.writelines("\n".join(list(set(whitelist)))+"\n")
# rspamc learn message
ret = subprocess.check_output([args.rspamc_path, "learn_{}".format(cls)], input=what, universal_newlines=True)
log.debug("Rspamc result: {}".format(ret))
except Exception as e:
log.exception(e)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment