Skip to content

Instantly share code, notes, and snippets.

@vsajip
Created April 30, 2011 15:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vsajip/949744 to your computer and use it in GitHub Desktop.
Save vsajip/949744 to your computer and use it in GitHub Desktop.
Test SMTP server for unit testing
import asynchat
import asyncore
import errno
import logging
import logging.handlers
import smtpd
import socket
import sys
import threading
TEST_SMTP_PORT = 9025
class TestSMTPChannel(smtpd.SMTPChannel):
def __init__(self, server, conn, addr, sockmap):
asynchat.async_chat.__init__(self, conn, sockmap)
self.smtp_server = server
self.conn = conn
self.addr = addr
self.received_lines = []
self.smtp_state = self.COMMAND
self.seen_greeting = ''
self.mailfrom = None
self.rcpttos = []
self.received_data = ''
self.fqdn = socket.getfqdn()
self.num_bytes = 0
try:
self.peer = conn.getpeername()
except socket.error as err:
# a race condition may occur if the other end is closing
# before we can get the peername
self.close()
if err.args[0] != errno.ENOTCONN:
raise
return
self.push('220 %s %s' % (self.fqdn, smtpd.__version__))
self.set_terminator(b'\r\n')
class TestSMTPServer(smtpd.SMTPServer):
channel_class = TestSMTPChannel
def __init__(self, addr, handler, poll_interval, sockmap):
self._localaddr = addr
self._remoteaddr = None
self.sockmap = sockmap
asyncore.dispatcher.__init__(self, map=sockmap)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
self.set_socket(sock, map=sockmap)
# try to re-use a server port if possible
self.set_reuse_addr()
self.bind(addr)
self.listen(5)
except:
self.close()
raise
self._handler = handler
self._thread = None
self.poll_interval = poll_interval
def handle_accepted(self, conn, addr):
print('Incoming connection from %s' % repr(addr), file=smtpd.DEBUGSTREAM)
channel = self.channel_class(self, conn, addr, self.sockmap)
def process_message(self, peer, mailfrom, rcpttos, data):
self._handler(peer, mailfrom, rcpttos, data)
def start(self):
self._thread = t = threading.Thread(target=self.serve_forever,
args=(self.poll_interval,))
t.setDaemon(True)
t.start()
def serve_forever(self, poll_interval):
asyncore.loop(poll_interval, map=self.sockmap)
def stop(self):
self.close()
self._thread.join()
self._thread = None
def process(peer, mailfrom, rcpttos, data):
print('%r' % ((peer, mailfrom, rcpttos, data),))
def main():
addr = ('localhost', TEST_SMTP_PORT)
#smtpd.DEBUGSTREAM = sys.stderr
h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log')
my_map = {}
server = TestSMTPServer(addr, process, 0.001, my_map)
server.start()
r = logging.makeLogRecord({'msg': 'Hello'})
h.handle(r)
server.stop()
h.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment