Skip to content

Instantly share code, notes, and snippets.

@vsajip vsajip/sstest.py
Created Apr 30, 2011

Embed
What would you like to do?
Test SMTP server for unit testing
import asynchat
import asyncore
import errno
import logging
import logging.handlers
import smtpd
import socket
import sys
import threading
TEST_SMTP_PORT = 9025
class TestSMTPChannel(smtpd.SMTPChannel):
def __init__(self, server, conn, addr, sockmap):
asynchat.async_chat.__init__(self, conn, sockmap)
self.smtp_server = server
self.conn = conn
self.addr = addr
self.received_lines = []
self.smtp_state = self.COMMAND
self.seen_greeting = ''
self.mailfrom = None
self.rcpttos = []
self.received_data = ''
self.fqdn = socket.getfqdn()
self.num_bytes = 0
try:
self.peer = conn.getpeername()
except socket.error as err:
# a race condition may occur if the other end is closing
# before we can get the peername
self.close()
if err.args[0] != errno.ENOTCONN:
raise
return
self.push('220 %s %s' % (self.fqdn, smtpd.__version__))
self.set_terminator(b'\r\n')
class TestSMTPServer(smtpd.SMTPServer):
channel_class = TestSMTPChannel
def __init__(self, addr, handler, poll_interval, sockmap):
self._localaddr = addr
self._remoteaddr = None
self.sockmap = sockmap
asyncore.dispatcher.__init__(self, map=sockmap)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
self.set_socket(sock, map=sockmap)
# try to re-use a server port if possible
self.set_reuse_addr()
self.bind(addr)
self.listen(5)
except:
self.close()
raise
self._handler = handler
self._thread = None
self.poll_interval = poll_interval
def handle_accepted(self, conn, addr):
print('Incoming connection from %s' % repr(addr), file=smtpd.DEBUGSTREAM)
channel = self.channel_class(self, conn, addr, self.sockmap)
def process_message(self, peer, mailfrom, rcpttos, data):
self._handler(peer, mailfrom, rcpttos, data)
def start(self):
self._thread = t = threading.Thread(target=self.serve_forever,
args=(self.poll_interval,))
t.setDaemon(True)
t.start()
def serve_forever(self, poll_interval):
asyncore.loop(poll_interval, map=self.sockmap)
def stop(self):
self.close()
self._thread.join()
self._thread = None
def process(peer, mailfrom, rcpttos, data):
print('%r' % ((peer, mailfrom, rcpttos, data),))
def main():
addr = ('localhost', TEST_SMTP_PORT)
#smtpd.DEBUGSTREAM = sys.stderr
h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log')
my_map = {}
server = TestSMTPServer(addr, process, 0.001, my_map)
server.start()
r = logging.makeLogRecord({'msg': 'Hello'})
h.handle(r)
server.stop()
h.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.