Instantly share code, notes, and snippets.

@fzliu /courier.py
Last active Sep 1, 2015

Embed
What would you like to do?
A customizable, "black hole" mail server.
#!/usr/bin/env python
import argparse
import asyncore
from email import message_from_string
from email.iterators import typed_subpart_iterator
from email.mime.text import MIMEText
from email.utils import formataddr
import getpass
import logging
import os
import pprint
from smtpd import SMTPServer
import smtplib
import sys
# argparse
parser = argparse.ArgumentParser(description="Customizable SMTP server.",
usage="courier.py -c <command>")
parser.add_argument("-c", "--command", required=False, default="print", help="mail command (bounce, dump, print)")
parser.add_argument("-o", "--out", required=False, default=None, help="output path for dump")
# logging
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
VALID_COMMANDS = ["bounce", "dump", "print"]
GMAIL_AUTH_FNAME = ".gmail_auth"
GMAIL_SMTP_SERVER = "smtp.gmail.com"
GMAIL_SMTP_PORT = 587
COURIER_EMAIL_ADDR = "courier@frankzliu.com"
def _bounce_mail(mail, server):
"""
Bounce mail back to the sender (with a friendly message).
"""
# check the destination addresses
if COURIER_EMAIL_ADDR in mail["addrs-to"]:
# compose the "bounce" message
append_txt = "Hey there! Here's your original message: \n" + "-"*20
bounce_msg = MIMEText(append_txt + "\n\n" + mail["body"])
bounce_msg["Subject"] = mail["subject"]
bounce_msg["From"] = COURIER_EMAIL_ADDR
bounce_msg["To"] = mail["addr-from"]
# send it
server.sendmail(COURIER_EMAIL_ADDR, mail["addr-from"], bounce_msg.as_string())
logging.info("bounced email back to {0}".format(mail["addr-from"]))
def _dump_mail(mail, path):
"""
Courier handler that dumps all mail to a text file.
"""
with open(path, "a") as f:
pprint.pprint(mail, stream=f)
def _print_mail(mail, stream):
"""
Courier handler that prints all received mail.
"""
pprint.pprint(mail, stream=stream)
class Courier(SMTPServer, object):
"""
Courier class, built on top of Python's SMTPServer.
"""
def __init__(self, cb_fn, cb_data, *args, **kwargs):
super(Courier, self).__init__(*args, **kwargs)
self._cb_fn = cb_fn
self._cb_data = cb_data
def process_message(self, peer, mailfrom, rcpttos, data):
"""
Process a single inbound message.
"""
headers = message_from_string(data)
body = ""
# multipart
if headers.is_multipart():
for part in typed_subpart_iterator(headers, "text", "plain"):
body += part.get_payload(decode=True).strip()
# message body
else:
body += headers.get_payload(decode=True).strip()
# compile the data
mail_data = {
"addr-from": mailfrom, # address only
"addrs-to": rcpttos, # addresses only
"sender": headers["from"], # includes name and address
"recipients": headers["to"], # includes names and addresses
"subject": headers["subject"], # subject
"body": body # body
}
logging.debug(mail_data)
return self._cb_fn(mail_data, self._cb_data)
def start_courier(args):
"""
Starts up a Courier instance.
"""
# get the command
command = args.command if args.command else "print"
assert command in VALID_COMMANDS, "please specify a valid commmand"
logging.info("specified command {0}".format(command))
# outgoing email server
if command == "bounce" and os.path.isfile(GMAIL_AUTH_FNAME):
out_srv = smtplib.SMTP(GMAIL_SMTP_SERVER, GMAIL_SMTP_PORT, timeout=sys.maxint)
out_srv.ehlo()
out_srv.starttls()
with open(GMAIL_AUTH_FNAME) as f:
user = f.readline().strip()
passwd = getpass.getpass("Password for {0}:".format(user))
is_pass_accepted = False
while not is_pass_accepted:
try:
out_srv.login(user, passwd)
is_pass_accepted = True
except smtplib.SMTPAuthenticationError:
passwd = getpass.getpass("Try again:")
else:
out_srv = None
# command and data (associated with command) map
cmd_map = {
"bounce": _bounce_mail,
"dump": _dump_mail,
"print": _print_mail
}
data_map = {
"bounce": out_srv,
"dump": args.out,
"print": sys.stderr
}
# start up
courier = Courier(cmd_map[command], data_map[command],
("0.0.0.0", 25), None)
try:
logging.info("starting Courier")
asyncore.loop()
except KeyboardInterrupt:
logging.info("stopping Courier")
if out_srv is not None:
out_srv.quit()
if __name__ == "__main__":
args = parser.parse_args()
start_courier(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment