Skip to content

Instantly share code, notes, and snippets.

@href
Created February 24, 2014 10:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save href/9185284 to your computer and use it in GitHub Desktop.
Save href/9185284 to your computer and use it in GitHub Desktop.
Tests for Seantis Postfix
import time
import unittest
import poplib
import textwrap
from uuid import uuid4
from email import Encoders
from email.base64mime import encode
from email.MIMEBase import MIMEBase
from email.mime.text import MIMEText
from email.MIMEMultipart import MIMEMultipart
from smtplib import SMTP, SMTPRecipientsRefused, SMTPException
from imbox import Imbox
from sievelib import managesieve
from imaplib2 import IMAP4
# the timeout used for clamav and spamassassin processing
processing_timeout = 1.0
class TestPostfixServer(unittest.TestCase):
server = 'mail.seantis.dev'
client = 'example.org'
def encode_auth(self, username, password):
return encode('\0{}\0{}'.format(username, password), eol="")
def smtp(self, server=None, port=25):
smtp = SMTP()
smtp.connect(server or self.server, port)
return smtp
def send_mail(
self, sender, recipient, subject, body,
user=None, pw=None, attachments=[]
):
# this is how a client should do it usually, so we do it as well
smtp = self.smtp(port=587)
smtp.starttls()
smtp.login(user or 'user@example.org', pw or 'test')
if attachments:
msg = MIMEMultipart(body)
else:
msg = MIMEText(body)
msg['From'] = sender
msg['To'] = recipient
msg['Subject'] = subject
for ix, attachment in enumerate(attachments):
part = MIMEBase('application', "octet-stream")
part.set_payload(attachment)
Encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
'attachment; filename="file_{}"'.format(ix)
)
msg.attach(part)
return smtp.sendmail(sender, recipient, msg.as_string())
def wait_until_processed(self):
time.sleep(processing_timeout)
def test_connect(self):
smtp = SMTP()
code, msg = smtp.connect(self.server, 25)
self.assertEqual(code, 220)
self.assertEqual(msg, 'mail.seantis.ch ESMTP Postfix')
def test_login_ehlo(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('root@example.org', 'test')
def test_login_helo(self):
smtp = self.smtp()
smtp.starttls()
smtp.helo(self.client)
# login cannot be called here because there's a bug in python:
# http://bugs.python.org/issue6683
smtp.docmd('AUTH PLAIN', self.encode_auth('user@example.org', 'test'))
def test_helo_required(self):
smtp = self.smtp()
code, msg = smtp.docmd("MAIL FROM: <user@example.org>")
self.assertIn("send HELO/EHLO first", msg)
self.assertEqual(code, 503)
def test_not_an_open_relay(self):
smtp = self.smtp()
smtp.ehlo('google.ch')
smtp.docmd('mail from: bill.gates@microsoft.com')
code, msg = smtp.docmd('rcpt to: steve.jobs@apple.com')
self.assertIn('Relay access denied', msg)
self.assertEqual(code, 554)
def test_not_send_to_self_unauthenticated(self):
smtp = self.smtp()
smtp.ehlo('example.org')
code, msg = smtp.docmd('mail from: user@example.org')
self.assertIn('Sender address rejected: not logged in', msg)
self.assertEqual(code, 553)
def test_authenticated_only_recipients(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo('example.org')
smtp.docmd('mail from: admin@google.ch')
code, msg = smtp.docmd('rcpt to: root@iris.seantis.ch')
self.assertIn('Recipient address rejected: Access denied', msg)
self.assertEqual(code, 554)
def test_authenticated_only_recipients_logged_in(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('root@example.org', 'test')
smtp.docmd('mail from: root@example.org')
code, msg = smtp.docmd('rcpt to: root@iris.seantis.ch')
self.assertIn('Ok', msg)
self.assertEqual(code, 250)
def test_invalid_client_hostname(self):
smtp = self.smtp()
smtp.ehlo('google.ch')
smtp.docmd('mail from: admin@google.ch')
code, msg = smtp.docmd('rcpt to: user@example.org')
self.assertIn('Client host rejected', msg)
self.assertEqual(code, 450)
def test_invalid_recipient(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('root@example.org', 'test')
smtp.docmd('mail from: root@example.org')
code, msg = smtp.docmd('rcpt to: unknown@example.org')
self.assertIn('Recipient address rejected: User unknown', msg)
self.assertEqual(code, 550)
def test_non_fqdn_recipient(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('root@example.org', 'test')
smtp.docmd('mail from: root@example.org')
code, msg = smtp.docmd('rcpt to: user')
self.assertIn('need fully-qualified address', msg)
self.assertEqual(code, 504)
def test_unknown_recipient_domain(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('root@example.org', 'test')
smtp.docmd('mail from: root@example.org')
code, msg = smtp.docmd('rcpt to: user@1.ch')
self.assertIn('Domain not found', msg)
self.assertEqual(code, 450)
def test_smtp_without_tls(self):
smtp = self.smtp()
smtp.ehlo(self.client)
self.assertRaises(
SMTPException, smtp.login, 'root@example.org', 'test'
)
def test_valid_send_ssl(self):
smtp = self.smtp()
smtp.ehlo(self.client)
smtp.starttls()
smtp.login('root@example.org', 'test')
smtp.docmd('mail from: root@example.org')
code, msg = smtp.docmd('rcpt to: denis@href.ch')
self.assertIn('Ok', msg)
self.assertEqual(code, 250)
def test_disable_vrfy_command(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('root@example.org', 'test')
code, msg = smtp.verify('root@example.org')
self.assertIn('VRFY command is disabled', msg)
self.assertEqual(code, 502)
def test_invalid_helos(self):
smtp = self.smtp()
code, msg = smtp.ehlo('mail.seantis.ch')
self.assertIn("Don't use my hostname", msg)
self.assertEqual(code, 554)
code, msg = smtp.ehlo('iris.seantis.ch')
self.assertIn("Don't use my hostname", msg)
self.assertEqual(code, 554)
code, msg = smtp.ehlo('[192.168.101.119]')
self.assertIn("Don't use my IP address", msg)
self.assertEqual(code, 554)
code, msg = smtp.ehlo('192.168.101.119')
self.assertIn('Your software is not RFC 2821 compliant', msg)
self.assertEqual(code, 554)
def test_sender_spoofing(self):
# users may not send emails with the address of another user
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('root@example.org', 'test')
code, msg = smtp.docmd('mail from: user@example.org')
self.assertIn('Sender address rejected: not owned by user', msg)
self.assertEqual(code, 553)
# users which receive aliased mails may send emails through them
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('root@example.org', 'test')
code, msg = smtp.docmd('mail from: alias@example.org')
self.assertIn('Ok', msg)
self.assertEqual(code, 250)
# any@example.org is allowed to send for the other users
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('any@example.org', 'test')
code, msg = smtp.docmd('mail from: root@example.org')
self.assertIn('Ok', msg)
self.assertEqual(code, 250)
# any@example.org should still be able to send for himself
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
smtp.login('any@example.org', 'test')
code, msg = smtp.docmd('mail from: any@example.org')
self.assertIn('Ok', msg)
self.assertEqual(code, 250)
def test_any_sender_does_not_receive(self):
# the any@example.org may send as any other adress, but he will
# never receive emails in these tests
imbox = Imbox(
self.server,
username='any@example.org',
password='test',
ssl=True
)
self.assertEqual(len(list(imbox.messages())), 0)
def test_tls(self):
smtp = self.smtp()
smtp.starttls()
smtp.ehlo(self.client)
code, msg = smtp.login('root@example.org', 'test')
self.assertIn('Authentication successful', msg)
self.assertEqual(code, 235)
def test_tls_forced_on_submission(self):
# using submission on port 587 requires TLS
smtp = self.smtp(port=587)
smtp.ehlo(self.client)
code, msg = smtp.docmd(
'AUTH PLAIN', self.encode_auth('user@example.org', 'test')
)
self.assertIn('Must issue a STARTTLS command first', msg)
self.assertEqual(code, 530)
smtp.starttls()
smtp.ehlo(self.client)
code, msg = smtp.docmd(
'AUTH PLAIN', self.encode_auth('user@example.org', 'test')
)
self.assertIn('Authentication successful', msg)
self.assertEqual(code, 235)
def test_imap_auth(self):
imbox = Imbox(
self.server,
username='user@example.org',
password='test',
ssl=True
)
self.assertTrue(imbox.connection)
def test_imap_auth_no_plain(self):
try:
Imbox(
self.server,
username='user@example.org',
password='test',
ssl=False
)
except Exception, e:
self.assertIn('Plaintext authentication disallowed', e.message)
else:
assert False, "Should have failed"
def test_imap_auth_starttls(self):
connection = IMAP4(self.server, port=143)
msg, result = connection.starttls()
self.assertEqual(msg, 'OK')
msg, result = connection.login('user@example.org', 'test')
self.assertEqual(msg, 'OK')
self.assertEqual(result, ['Logged in'])
msg, result = connection.list()
self.assertEqual(msg, 'OK')
def test_imap_receive_email(self):
# use a random text to find the right message again
text = uuid4().hex
self.send_mail(
sender='alias@example.org',
recipient='alias@example.org',
subject=text,
body=text
)
self.wait_until_processed()
# both user and root should get the aliased email
for user in ('user@example.org', 'root@example.org'):
imbox = Imbox(
self.server,
username=user,
password='test',
ssl=True
)
messages = [m for m in imbox.messages() if m[1].subject == text]
self.assertEqual(len(messages), 1)
def test_pop3_receive_email(self):
# use a random text to find the right message again
text = uuid4().hex
self.send_mail(
sender='user@example.org',
recipient='root@example.org',
subject=text,
body=text
)
self.wait_until_processed()
pop = poplib.POP3_SSL(self.server)
try:
pop.user('root@example.org')
pop.pass_('test')
found_emails = 0
for i in range(len(pop.list()[1])):
for j in pop.retr(i+1)[1]:
if text in j and 'Subject' in j:
found_emails += 1
self.assertEqual(found_emails, 1)
finally:
pop.quit() # unlock mailbox
def test_pop3_auth_no_plain(self):
# we can only really check if no plain text auth works with pop3,
# not if starttls works, because python's library does not support that
# but we do care more about plaintext not working anyway
pop = poplib.POP3(self.server)
try:
pop.user('user@example.org')
except Exception, e:
self.assertIn('Plaintext authentication disallowed', e.message)
else:
assert False, "Exception should have occured"
def test_outbound_policy(self):
# policyd's outbound policy restricts the email by counting upwards
# with each new mail, whilst counting downwards over time which is
# why the code below is able to send 4 messages even though the limit
# is 3 messages in 3 seconds.
def send():
return self.send_mail(
sender='outbound@example.org',
user='outbound@example.org',
recipient='user@example.org',
subject='test',
body='test'
)
# the limit will kick in anywhere between 3 and 6 messages depending
# on how fast the test can be run / how many times they are re-run
# in a short period of time
try:
for i in range(0, 6):
send()
except SMTPRecipientsRefused, e:
code, msg = e.recipients['user@example.org']
else:
code, msg = None, None
self.assertNotEqual(code, None)
self.assertNotEqual(msg, None)
self.assertIn('Policy rejection; Message count quota exceeded', msg)
self.assertEqual(code, 450)
def test_send_virus_signature(self):
# send the EICAR virus test signature to ensure that the
# email is found to have a virus and is in effect dropped silently
# see http://www.eicar.org/86-0-Intended-use.html
signature = (
'X5O!P%@AP[4\PZX54(P^)7CC)7}$'
'EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*'
)
text = uuid4().hex
self.send_mail(
sender='user@example.org',
recipient='root@example.org',
subject=text,
body=text,
attachments=[signature]
)
self.wait_until_processed()
# the mail should have been dropped
imbox = Imbox(
self.server,
username='root@example.org',
password='test',
ssl=True
)
messages = [m for m in imbox.messages() if m[1].subject == text]
self.assertEqual(len(messages), 0)
def test_default_imap_folders(self):
# see 10-mail.conf.erb
default_folders = ['Drafts', 'Sent', 'Spam', 'Trash', 'Archive']
imbox = Imbox(
self.server,
username='root@example.org',
password='test',
ssl=True
)
server = imbox.server.list_folders()[1]
for folder in default_folders:
self.assertEqual(len([f for f in server if folder in f]), 1)
def test_spam_blacklist(self):
text = uuid4().hex
self.send_mail(
sender='spammer@example.org',
recipient='root@example.org',
subject=text,
body='',
user='spammer@example.org',
pw='test'
)
self.wait_until_processed()
imbox = Imbox(
self.server,
username='root@example.org',
password='test',
ssl=True
)
# it should not be in the normal folder
messages = [m for m in imbox.messages() if m[1].subject == text]
self.assertEqual(len(messages), 0)
# it will be in the spam folder
messages = [
m for m in imbox.messages(folder='Spam') if m[1].subject == text
]
self.assertEqual(len(messages), 1)
headers = messages[0][1].headers
headers = dict((h['Name'], h['Value']) for h in headers)
self.assertTrue(headers['X-Spam-Status'].startswith('Yes'))
def test_receive_spam(self):
# send the GTUBE spam test signature and ensure that the email
# headers reflect the fact that it's spam
signature = (
'XJS*C4JDBQADN1.NSBN3*2IDNEN*'
'GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X'
)
text = uuid4().hex
self.send_mail(
sender='user@example.org',
recipient='root@example.org',
subject=text,
body=signature
)
self.wait_until_processed()
imbox = Imbox(
self.server,
username='root@example.org',
password='test',
ssl=True
)
# it should not be in the normal folder
messages = [m for m in imbox.messages() if m[1].subject == text]
self.assertEqual(len(messages), 0)
# it will be in the spam folder
messages = [
m for m in imbox.messages(folder='Spam') if m[1].subject == text
]
self.assertEqual(len(messages), 1)
headers = messages[0][1].headers
headers = dict((h['Name'], h['Value']) for h in headers)
self.assertTrue(headers['X-Spam-Status'].startswith('Yes'))
def test_manage_sieve(self):
client = managesieve.Client(self.server)
# tls only
result = client.connect('user@example.org', 'test', starttls=False)
self.assertFalse(result)
result = client.connect('user@example.org', 'test', starttls=True)
self.assertTrue(result)
token = uuid4().hex
script = textwrap.dedent("""
require "body";
require "reject";
if body :contains "%s" {
reject "please do not send me anymore e-mails";
}
""" % token)
try:
script_put = client.putscript('test', script)
client.setactive('test')
self.assertTrue(script_put, "sieve script syntax error")
self.send_mail(
sender='root@example.org',
user='root@example.org',
recipient='user@example.org',
subject=token,
body=token
)
self.wait_until_processed()
# rejecting the email means that it's sent back to the user
# (it's not actually rejected during the sending, because
# that would obviously be way too slow)
imbox = Imbox(
self.server,
username='root@example.org',
password='test',
ssl=True
)
messages = [m for m in imbox.messages() if token in m[1].subject]
self.assertEqual(len(messages), 1)
self.assertEqual('Rejected: {}'.format(token), m[1].subject)
self.assertEqual(
m[1].sent_from[0]['email'], 'postmaster@seantis.ch'
)
# imbox doesn't handle multipart/report messages well so
# we need to peek into the raw message
raw = imbox.connection.uid('fetch', m[0], '(BODY.PEEK[])')[1][0][1]
self.assertIn('please do not send me anymore e-mails', raw)
finally:
client.deletescript('test')
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment