Created
October 29, 2013 03:30
-
-
Save odoku/7208794 to your computer and use it in GitHub Desktop.
デバッグ用のSMTPサーバー。
起動すると送信したメールの内容がコンソールに出力されるよ。
日本語対応済。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import smtpd | |
import asyncore | |
import email | |
from email.message import Message | |
from email.header import decode_header | |
from email.utils import parseaddr | |
from optparse import OptionParser | |
__version__ = '0.0.1' | |
class DebugSMTPServer(smtpd.SMTPServer): | |
def __init__(self, localaddr, remoteaddr=None): | |
smtpd.SMTPServer.__init__(self, localaddr, remoteaddr) | |
def process_message(self, peer, mailfrom, rcpttos, data): | |
message = self.get_message_object(data) | |
subject = message.get_subject() | |
body = message.get_body() | |
attachments = message.get_attachments() | |
print 'Receiving message from:', peer | |
print 'Message addressed from:', mailfrom | |
print 'Message addressed to :', rcpttos | |
print 'Message length :', len(data) | |
print 'Subject :', subject | |
print 'Body :' | |
print body | |
print 'Attachments :', ', '.join([name for name, mime, data in attachments]) | |
return | |
def get_message_object(self, data): | |
return email.message_from_string(data, _class=SmartMessage) | |
class SmartMessage(Message): | |
def get_subject(self): | |
subject, charset = decode_header(self['Subject'])[0] | |
return subject.decode(charset) if charset else subject | |
def get_body(self, html=False): | |
if self.is_multipart(): | |
body = self.get_multipart_body(html) | |
else: | |
charset = self.get_content_charset() | |
body = self.get_payload(decode=True) | |
body = body.decode(charset) if charset else body | |
return body | |
def get_multipart_body(self, html=False): | |
for index, item in enumerate(self.walk()): | |
if item.is_attachment(): continue | |
if (item.get_content_type() == 'text/plain' and not html) or\ | |
(item.get_content_type() == 'text/html' and html): | |
charset = item.get_content_charset() | |
body = item.get_payload(decode=True) | |
return body.decode(charset) if charset else body | |
def is_attachment(self): | |
return bool(self.get_filename()) | |
def get_attachments(self): | |
attachments = [] | |
for index, item in enumerate(self.walk()): | |
if not item.is_attachment(): continue | |
attachments.append(( | |
item.get_filename(), | |
item.get_content_type(), | |
item.get_payload(decode=True), | |
)) | |
return attachments | |
if __name__ == '__main__': | |
p = OptionParser(version="ver:%s" % __version__) | |
p.add_option('-H', '--host', action='store', type='string', default='localhost', help="Bind local address.") | |
p.add_option('-p', '--port', action='store', type='int', default=25, help="BInd local port..") | |
options, args = p.parse_args() | |
print 'Starting SMTP Server (%s:%d)' % (options.host, options.port) | |
server = DebugSMTPServer((options.host, options.port), None) | |
try: | |
asyncore.loop() | |
except KeyboardInterrupt, e: | |
print "Quit SMTP Server." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment