Skip to content

Instantly share code, notes, and snippets.

@duhaime
Last active June 27, 2023 16:16
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save duhaime/cd2e2f1ad50d81890318abe518112c71 to your computer and use it in GitHub Desktop.
Save duhaime/cd2e2f1ad50d81890318abe518112c71 to your computer and use it in GitHub Desktop.
smtp python
from flask import Flask, render_template
from smtp_client import send_email
from smtp_server import SMTPServer
app = Flask(__name__)
@app.route('/send_email')
def email():
server = SMTPServer()
server.start()
try:
send_email()
finally:
server.stop()
return 'OK'
@app.route('/')
def index():
return 'Woohoo'
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')
import smtplib
import email.utils
from email.mime.text import MIMEText
def send_email():
sender='author@example.com'
recipient='6142546977@tmomail.net'
msg = MIMEText('This is the body of the message.')
msg['To'] = email.utils.formataddr(('Recipient', recipient))
msg['From'] = email.utils.formataddr(('Author', 'author@example.com'))
msg['Subject'] = 'Simple test message'
client = smtplib.SMTP('127.0.0.1', 1025)
client.set_debuglevel(True) # show communication with the server
try:
client.sendmail('author@example.com', [recipient], msg.as_string())
finally:
client.quit()
import smtpd
import asyncore
import threading
class CustomSMTPServer(smtpd.SMTPServer):
def process_message(self, peer, mailfrom, rcpttos, data):
print('Receiving message from:', peer)
print('Message addressed from:', mailfrom)
print('Message addressed to:', rcpttos)
print('Message length:', len(data))
return
class SMTPServer():
def __init__(self):
self.port = 1025
def start(self):
'''Start listening on self.port'''
# create an instance of the SMTP server, derived from asyncore.dispatcher
self.smtp = CustomSMTPServer(('0.0.0.0', self.port), None)
# start the asyncore loop, listening for SMTP connection, within a thread
# timeout parameter is important, otherwise code will block 30 seconds
# after the smtp channel has been closed
kwargs = {'timeout':1, 'use_poll': True}
self.thread = threading.Thread(target=asyncore.loop, kwargs=kwargs)
self.thread.start()
def stop(self):
'''Stop listening to self.port'''
# close the SMTPserver to ensure no channels connect to asyncore
self.smtp.close()
# now it is safe to wait for asyncore.loop() to exit
self.thread.join()
# check for emails in a non-blocking way
def get(self):
'''Return all emails received so far'''
return self.smtp.emails
if __name__ == '__main__':
server = CustomSMTPServer(('0.0.0.0', 1025), None)
asyncore.loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment