Created
February 24, 2014 10:32
-
-
Save href/9185284 to your computer and use it in GitHub Desktop.
Tests for Seantis Postfix
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import unittest | |
import poplib | |
import textwrap | |
from uuid import uuid4 | |
from email import Encoders | |
from email.base64mime import encode | |
from email.MIMEBase import MIMEBase | |
from email.mime.text import MIMEText | |
from email.MIMEMultipart import MIMEMultipart | |
from smtplib import SMTP, SMTPRecipientsRefused, SMTPException | |
from imbox import Imbox | |
from sievelib import managesieve | |
from imaplib2 import IMAP4 | |
# the timeout used for clamav and spamassassin processing | |
processing_timeout = 1.0 | |
class TestPostfixServer(unittest.TestCase): | |
server = 'mail.seantis.dev' | |
client = 'example.org' | |
def encode_auth(self, username, password): | |
return encode('\0{}\0{}'.format(username, password), eol="") | |
def smtp(self, server=None, port=25): | |
smtp = SMTP() | |
smtp.connect(server or self.server, port) | |
return smtp | |
def send_mail( | |
self, sender, recipient, subject, body, | |
user=None, pw=None, attachments=[] | |
): | |
# this is how a client should do it usually, so we do it as well | |
smtp = self.smtp(port=587) | |
smtp.starttls() | |
smtp.login(user or 'user@example.org', pw or 'test') | |
if attachments: | |
msg = MIMEMultipart(body) | |
else: | |
msg = MIMEText(body) | |
msg['From'] = sender | |
msg['To'] = recipient | |
msg['Subject'] = subject | |
for ix, attachment in enumerate(attachments): | |
part = MIMEBase('application', "octet-stream") | |
part.set_payload(attachment) | |
Encoders.encode_base64(part) | |
part.add_header( | |
'Content-Disposition', | |
'attachment; filename="file_{}"'.format(ix) | |
) | |
msg.attach(part) | |
return smtp.sendmail(sender, recipient, msg.as_string()) | |
def wait_until_processed(self): | |
time.sleep(processing_timeout) | |
def test_connect(self): | |
smtp = SMTP() | |
code, msg = smtp.connect(self.server, 25) | |
self.assertEqual(code, 220) | |
self.assertEqual(msg, 'mail.seantis.ch ESMTP Postfix') | |
def test_login_ehlo(self): | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.ehlo(self.client) | |
smtp.login('root@example.org', 'test') | |
def test_login_helo(self): | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.helo(self.client) | |
# login cannot be called here because there's a bug in python: | |
# http://bugs.python.org/issue6683 | |
smtp.docmd('AUTH PLAIN', self.encode_auth('user@example.org', 'test')) | |
def test_helo_required(self): | |
smtp = self.smtp() | |
code, msg = smtp.docmd("MAIL FROM: <user@example.org>") | |
self.assertIn("send HELO/EHLO first", msg) | |
self.assertEqual(code, 503) | |
def test_not_an_open_relay(self): | |
smtp = self.smtp() | |
smtp.ehlo('google.ch') | |
smtp.docmd('mail from: bill.gates@microsoft.com') | |
code, msg = smtp.docmd('rcpt to: steve.jobs@apple.com') | |
self.assertIn('Relay access denied', msg) | |
self.assertEqual(code, 554) | |
def test_not_send_to_self_unauthenticated(self): | |
smtp = self.smtp() | |
smtp.ehlo('example.org') | |
code, msg = smtp.docmd('mail from: user@example.org') | |
self.assertIn('Sender address rejected: not logged in', msg) | |
self.assertEqual(code, 553) | |
def test_authenticated_only_recipients(self): | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.ehlo('example.org') | |
smtp.docmd('mail from: admin@google.ch') | |
code, msg = smtp.docmd('rcpt to: root@iris.seantis.ch') | |
self.assertIn('Recipient address rejected: Access denied', msg) | |
self.assertEqual(code, 554) | |
def test_authenticated_only_recipients_logged_in(self): | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.ehlo(self.client) | |
smtp.login('root@example.org', 'test') | |
smtp.docmd('mail from: root@example.org') | |
code, msg = smtp.docmd('rcpt to: root@iris.seantis.ch') | |
self.assertIn('Ok', msg) | |
self.assertEqual(code, 250) | |
def test_invalid_client_hostname(self): | |
smtp = self.smtp() | |
smtp.ehlo('google.ch') | |
smtp.docmd('mail from: admin@google.ch') | |
code, msg = smtp.docmd('rcpt to: user@example.org') | |
self.assertIn('Client host rejected', msg) | |
self.assertEqual(code, 450) | |
def test_invalid_recipient(self): | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.ehlo(self.client) | |
smtp.login('root@example.org', 'test') | |
smtp.docmd('mail from: root@example.org') | |
code, msg = smtp.docmd('rcpt to: unknown@example.org') | |
self.assertIn('Recipient address rejected: User unknown', msg) | |
self.assertEqual(code, 550) | |
def test_non_fqdn_recipient(self): | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.ehlo(self.client) | |
smtp.login('root@example.org', 'test') | |
smtp.docmd('mail from: root@example.org') | |
code, msg = smtp.docmd('rcpt to: user') | |
self.assertIn('need fully-qualified address', msg) | |
self.assertEqual(code, 504) | |
def test_unknown_recipient_domain(self): | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.ehlo(self.client) | |
smtp.login('root@example.org', 'test') | |
smtp.docmd('mail from: root@example.org') | |
code, msg = smtp.docmd('rcpt to: user@1.ch') | |
self.assertIn('Domain not found', msg) | |
self.assertEqual(code, 450) | |
def test_smtp_without_tls(self): | |
smtp = self.smtp() | |
smtp.ehlo(self.client) | |
self.assertRaises( | |
SMTPException, smtp.login, 'root@example.org', 'test' | |
) | |
def test_valid_send_ssl(self): | |
smtp = self.smtp() | |
smtp.ehlo(self.client) | |
smtp.starttls() | |
smtp.login('root@example.org', 'test') | |
smtp.docmd('mail from: root@example.org') | |
code, msg = smtp.docmd('rcpt to: denis@href.ch') | |
self.assertIn('Ok', msg) | |
self.assertEqual(code, 250) | |
def test_disable_vrfy_command(self): | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.ehlo(self.client) | |
smtp.login('root@example.org', 'test') | |
code, msg = smtp.verify('root@example.org') | |
self.assertIn('VRFY command is disabled', msg) | |
self.assertEqual(code, 502) | |
def test_invalid_helos(self): | |
smtp = self.smtp() | |
code, msg = smtp.ehlo('mail.seantis.ch') | |
self.assertIn("Don't use my hostname", msg) | |
self.assertEqual(code, 554) | |
code, msg = smtp.ehlo('iris.seantis.ch') | |
self.assertIn("Don't use my hostname", msg) | |
self.assertEqual(code, 554) | |
code, msg = smtp.ehlo('[192.168.101.119]') | |
self.assertIn("Don't use my IP address", msg) | |
self.assertEqual(code, 554) | |
code, msg = smtp.ehlo('192.168.101.119') | |
self.assertIn('Your software is not RFC 2821 compliant', msg) | |
self.assertEqual(code, 554) | |
def test_sender_spoofing(self): | |
# users may not send emails with the address of another user | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.ehlo(self.client) | |
smtp.login('root@example.org', 'test') | |
code, msg = smtp.docmd('mail from: user@example.org') | |
self.assertIn('Sender address rejected: not owned by user', msg) | |
self.assertEqual(code, 553) | |
# users which receive aliased mails may send emails through them | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.ehlo(self.client) | |
smtp.login('root@example.org', 'test') | |
code, msg = smtp.docmd('mail from: alias@example.org') | |
self.assertIn('Ok', msg) | |
self.assertEqual(code, 250) | |
# any@example.org is allowed to send for the other users | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.ehlo(self.client) | |
smtp.login('any@example.org', 'test') | |
code, msg = smtp.docmd('mail from: root@example.org') | |
self.assertIn('Ok', msg) | |
self.assertEqual(code, 250) | |
# any@example.org should still be able to send for himself | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.ehlo(self.client) | |
smtp.login('any@example.org', 'test') | |
code, msg = smtp.docmd('mail from: any@example.org') | |
self.assertIn('Ok', msg) | |
self.assertEqual(code, 250) | |
def test_any_sender_does_not_receive(self): | |
# the any@example.org may send as any other adress, but he will | |
# never receive emails in these tests | |
imbox = Imbox( | |
self.server, | |
username='any@example.org', | |
password='test', | |
ssl=True | |
) | |
self.assertEqual(len(list(imbox.messages())), 0) | |
def test_tls(self): | |
smtp = self.smtp() | |
smtp.starttls() | |
smtp.ehlo(self.client) | |
code, msg = smtp.login('root@example.org', 'test') | |
self.assertIn('Authentication successful', msg) | |
self.assertEqual(code, 235) | |
def test_tls_forced_on_submission(self): | |
# using submission on port 587 requires TLS | |
smtp = self.smtp(port=587) | |
smtp.ehlo(self.client) | |
code, msg = smtp.docmd( | |
'AUTH PLAIN', self.encode_auth('user@example.org', 'test') | |
) | |
self.assertIn('Must issue a STARTTLS command first', msg) | |
self.assertEqual(code, 530) | |
smtp.starttls() | |
smtp.ehlo(self.client) | |
code, msg = smtp.docmd( | |
'AUTH PLAIN', self.encode_auth('user@example.org', 'test') | |
) | |
self.assertIn('Authentication successful', msg) | |
self.assertEqual(code, 235) | |
def test_imap_auth(self): | |
imbox = Imbox( | |
self.server, | |
username='user@example.org', | |
password='test', | |
ssl=True | |
) | |
self.assertTrue(imbox.connection) | |
def test_imap_auth_no_plain(self): | |
try: | |
Imbox( | |
self.server, | |
username='user@example.org', | |
password='test', | |
ssl=False | |
) | |
except Exception, e: | |
self.assertIn('Plaintext authentication disallowed', e.message) | |
else: | |
assert False, "Should have failed" | |
def test_imap_auth_starttls(self): | |
connection = IMAP4(self.server, port=143) | |
msg, result = connection.starttls() | |
self.assertEqual(msg, 'OK') | |
msg, result = connection.login('user@example.org', 'test') | |
self.assertEqual(msg, 'OK') | |
self.assertEqual(result, ['Logged in']) | |
msg, result = connection.list() | |
self.assertEqual(msg, 'OK') | |
def test_imap_receive_email(self): | |
# use a random text to find the right message again | |
text = uuid4().hex | |
self.send_mail( | |
sender='alias@example.org', | |
recipient='alias@example.org', | |
subject=text, | |
body=text | |
) | |
self.wait_until_processed() | |
# both user and root should get the aliased email | |
for user in ('user@example.org', 'root@example.org'): | |
imbox = Imbox( | |
self.server, | |
username=user, | |
password='test', | |
ssl=True | |
) | |
messages = [m for m in imbox.messages() if m[1].subject == text] | |
self.assertEqual(len(messages), 1) | |
def test_pop3_receive_email(self): | |
# use a random text to find the right message again | |
text = uuid4().hex | |
self.send_mail( | |
sender='user@example.org', | |
recipient='root@example.org', | |
subject=text, | |
body=text | |
) | |
self.wait_until_processed() | |
pop = poplib.POP3_SSL(self.server) | |
try: | |
pop.user('root@example.org') | |
pop.pass_('test') | |
found_emails = 0 | |
for i in range(len(pop.list()[1])): | |
for j in pop.retr(i+1)[1]: | |
if text in j and 'Subject' in j: | |
found_emails += 1 | |
self.assertEqual(found_emails, 1) | |
finally: | |
pop.quit() # unlock mailbox | |
def test_pop3_auth_no_plain(self): | |
# we can only really check if no plain text auth works with pop3, | |
# not if starttls works, because python's library does not support that | |
# but we do care more about plaintext not working anyway | |
pop = poplib.POP3(self.server) | |
try: | |
pop.user('user@example.org') | |
except Exception, e: | |
self.assertIn('Plaintext authentication disallowed', e.message) | |
else: | |
assert False, "Exception should have occured" | |
def test_outbound_policy(self): | |
# policyd's outbound policy restricts the email by counting upwards | |
# with each new mail, whilst counting downwards over time which is | |
# why the code below is able to send 4 messages even though the limit | |
# is 3 messages in 3 seconds. | |
def send(): | |
return self.send_mail( | |
sender='outbound@example.org', | |
user='outbound@example.org', | |
recipient='user@example.org', | |
subject='test', | |
body='test' | |
) | |
# the limit will kick in anywhere between 3 and 6 messages depending | |
# on how fast the test can be run / how many times they are re-run | |
# in a short period of time | |
try: | |
for i in range(0, 6): | |
send() | |
except SMTPRecipientsRefused, e: | |
code, msg = e.recipients['user@example.org'] | |
else: | |
code, msg = None, None | |
self.assertNotEqual(code, None) | |
self.assertNotEqual(msg, None) | |
self.assertIn('Policy rejection; Message count quota exceeded', msg) | |
self.assertEqual(code, 450) | |
def test_send_virus_signature(self): | |
# send the EICAR virus test signature to ensure that the | |
# email is found to have a virus and is in effect dropped silently | |
# see http://www.eicar.org/86-0-Intended-use.html | |
signature = ( | |
'X5O!P%@AP[4\PZX54(P^)7CC)7}$' | |
'EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*' | |
) | |
text = uuid4().hex | |
self.send_mail( | |
sender='user@example.org', | |
recipient='root@example.org', | |
subject=text, | |
body=text, | |
attachments=[signature] | |
) | |
self.wait_until_processed() | |
# the mail should have been dropped | |
imbox = Imbox( | |
self.server, | |
username='root@example.org', | |
password='test', | |
ssl=True | |
) | |
messages = [m for m in imbox.messages() if m[1].subject == text] | |
self.assertEqual(len(messages), 0) | |
def test_default_imap_folders(self): | |
# see 10-mail.conf.erb | |
default_folders = ['Drafts', 'Sent', 'Spam', 'Trash', 'Archive'] | |
imbox = Imbox( | |
self.server, | |
username='root@example.org', | |
password='test', | |
ssl=True | |
) | |
server = imbox.server.list_folders()[1] | |
for folder in default_folders: | |
self.assertEqual(len([f for f in server if folder in f]), 1) | |
def test_spam_blacklist(self): | |
text = uuid4().hex | |
self.send_mail( | |
sender='spammer@example.org', | |
recipient='root@example.org', | |
subject=text, | |
body='', | |
user='spammer@example.org', | |
pw='test' | |
) | |
self.wait_until_processed() | |
imbox = Imbox( | |
self.server, | |
username='root@example.org', | |
password='test', | |
ssl=True | |
) | |
# it should not be in the normal folder | |
messages = [m for m in imbox.messages() if m[1].subject == text] | |
self.assertEqual(len(messages), 0) | |
# it will be in the spam folder | |
messages = [ | |
m for m in imbox.messages(folder='Spam') if m[1].subject == text | |
] | |
self.assertEqual(len(messages), 1) | |
headers = messages[0][1].headers | |
headers = dict((h['Name'], h['Value']) for h in headers) | |
self.assertTrue(headers['X-Spam-Status'].startswith('Yes')) | |
def test_receive_spam(self): | |
# send the GTUBE spam test signature and ensure that the email | |
# headers reflect the fact that it's spam | |
signature = ( | |
'XJS*C4JDBQADN1.NSBN3*2IDNEN*' | |
'GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X' | |
) | |
text = uuid4().hex | |
self.send_mail( | |
sender='user@example.org', | |
recipient='root@example.org', | |
subject=text, | |
body=signature | |
) | |
self.wait_until_processed() | |
imbox = Imbox( | |
self.server, | |
username='root@example.org', | |
password='test', | |
ssl=True | |
) | |
# it should not be in the normal folder | |
messages = [m for m in imbox.messages() if m[1].subject == text] | |
self.assertEqual(len(messages), 0) | |
# it will be in the spam folder | |
messages = [ | |
m for m in imbox.messages(folder='Spam') if m[1].subject == text | |
] | |
self.assertEqual(len(messages), 1) | |
headers = messages[0][1].headers | |
headers = dict((h['Name'], h['Value']) for h in headers) | |
self.assertTrue(headers['X-Spam-Status'].startswith('Yes')) | |
def test_manage_sieve(self): | |
client = managesieve.Client(self.server) | |
# tls only | |
result = client.connect('user@example.org', 'test', starttls=False) | |
self.assertFalse(result) | |
result = client.connect('user@example.org', 'test', starttls=True) | |
self.assertTrue(result) | |
token = uuid4().hex | |
script = textwrap.dedent(""" | |
require "body"; | |
require "reject"; | |
if body :contains "%s" { | |
reject "please do not send me anymore e-mails"; | |
} | |
""" % token) | |
try: | |
script_put = client.putscript('test', script) | |
client.setactive('test') | |
self.assertTrue(script_put, "sieve script syntax error") | |
self.send_mail( | |
sender='root@example.org', | |
user='root@example.org', | |
recipient='user@example.org', | |
subject=token, | |
body=token | |
) | |
self.wait_until_processed() | |
# rejecting the email means that it's sent back to the user | |
# (it's not actually rejected during the sending, because | |
# that would obviously be way too slow) | |
imbox = Imbox( | |
self.server, | |
username='root@example.org', | |
password='test', | |
ssl=True | |
) | |
messages = [m for m in imbox.messages() if token in m[1].subject] | |
self.assertEqual(len(messages), 1) | |
self.assertEqual('Rejected: {}'.format(token), m[1].subject) | |
self.assertEqual( | |
m[1].sent_from[0]['email'], 'postmaster@seantis.ch' | |
) | |
# imbox doesn't handle multipart/report messages well so | |
# we need to peek into the raw message | |
raw = imbox.connection.uid('fetch', m[0], '(BODY.PEEK[])')[1][0][1] | |
self.assertIn('please do not send me anymore e-mails', raw) | |
finally: | |
client.deletescript('test') | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment