Skip to content

Instantly share code, notes, and snippets.

@odoku
Created October 29, 2013 03:30
Show Gist options
  • Save odoku/7208794 to your computer and use it in GitHub Desktop.
Save odoku/7208794 to your computer and use it in GitHub Desktop.
デバッグ用のSMTPサーバー。 起動すると送信したメールの内容がコンソールに出力されるよ。 日本語対応済。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import smtpd
import asyncore
import email
from email.message import Message
from email.header import decode_header
from email.utils import parseaddr
from optparse import OptionParser
__version__ = '0.0.1'
class DebugSMTPServer(smtpd.SMTPServer):
def __init__(self, localaddr, remoteaddr=None):
smtpd.SMTPServer.__init__(self, localaddr, remoteaddr)
def process_message(self, peer, mailfrom, rcpttos, data):
message = self.get_message_object(data)
subject = message.get_subject()
body = message.get_body()
attachments = message.get_attachments()
print 'Receiving message from:', peer
print 'Message addressed from:', mailfrom
print 'Message addressed to :', rcpttos
print 'Message length :', len(data)
print 'Subject :', subject
print 'Body :'
print body
print 'Attachments :', ', '.join([name for name, mime, data in attachments])
return
def get_message_object(self, data):
return email.message_from_string(data, _class=SmartMessage)
class SmartMessage(Message):
def get_subject(self):
subject, charset = decode_header(self['Subject'])[0]
return subject.decode(charset) if charset else subject
def get_body(self, html=False):
if self.is_multipart():
body = self.get_multipart_body(html)
else:
charset = self.get_content_charset()
body = self.get_payload(decode=True)
body = body.decode(charset) if charset else body
return body
def get_multipart_body(self, html=False):
for index, item in enumerate(self.walk()):
if item.is_attachment(): continue
if (item.get_content_type() == 'text/plain' and not html) or\
(item.get_content_type() == 'text/html' and html):
charset = item.get_content_charset()
body = item.get_payload(decode=True)
return body.decode(charset) if charset else body
def is_attachment(self):
return bool(self.get_filename())
def get_attachments(self):
attachments = []
for index, item in enumerate(self.walk()):
if not item.is_attachment(): continue
attachments.append((
item.get_filename(),
item.get_content_type(),
item.get_payload(decode=True),
))
return attachments
if __name__ == '__main__':
p = OptionParser(version="ver:%s" % __version__)
p.add_option('-H', '--host', action='store', type='string', default='localhost', help="Bind local address.")
p.add_option('-p', '--port', action='store', type='int', default=25, help="BInd local port..")
options, args = p.parse_args()
print 'Starting SMTP Server (%s:%d)' % (options.host, options.port)
server = DebugSMTPServer((options.host, options.port), None)
try:
asyncore.loop()
except KeyboardInterrupt, e:
print "Quit SMTP Server."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment