Last active
June 27, 2023 16:16
-
-
Save duhaime/cd2e2f1ad50d81890318abe518112c71 to your computer and use it in GitHub Desktop.
smtp python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, render_template | |
from smtp_client import send_email | |
from smtp_server import SMTPServer | |
app = Flask(__name__) | |
@app.route('/send_email') | |
def email(): | |
server = SMTPServer() | |
server.start() | |
try: | |
send_email() | |
finally: | |
server.stop() | |
return 'OK' | |
@app.route('/') | |
def index(): | |
return 'Woohoo' | |
if __name__ == '__main__': | |
app.run(debug=True, host='0.0.0.0') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import smtplib | |
import email.utils | |
from email.mime.text import MIMEText | |
def send_email(): | |
sender='author@example.com' | |
recipient='6142546977@tmomail.net' | |
msg = MIMEText('This is the body of the message.') | |
msg['To'] = email.utils.formataddr(('Recipient', recipient)) | |
msg['From'] = email.utils.formataddr(('Author', 'author@example.com')) | |
msg['Subject'] = 'Simple test message' | |
client = smtplib.SMTP('127.0.0.1', 1025) | |
client.set_debuglevel(True) # show communication with the server | |
try: | |
client.sendmail('author@example.com', [recipient], msg.as_string()) | |
finally: | |
client.quit() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import smtpd | |
import asyncore | |
import threading | |
class CustomSMTPServer(smtpd.SMTPServer): | |
def process_message(self, peer, mailfrom, rcpttos, data): | |
print('Receiving message from:', peer) | |
print('Message addressed from:', mailfrom) | |
print('Message addressed to:', rcpttos) | |
print('Message length:', len(data)) | |
return | |
class SMTPServer(): | |
def __init__(self): | |
self.port = 1025 | |
def start(self): | |
'''Start listening on self.port''' | |
# create an instance of the SMTP server, derived from asyncore.dispatcher | |
self.smtp = CustomSMTPServer(('0.0.0.0', self.port), None) | |
# start the asyncore loop, listening for SMTP connection, within a thread | |
# timeout parameter is important, otherwise code will block 30 seconds | |
# after the smtp channel has been closed | |
kwargs = {'timeout':1, 'use_poll': True} | |
self.thread = threading.Thread(target=asyncore.loop, kwargs=kwargs) | |
self.thread.start() | |
def stop(self): | |
'''Stop listening to self.port''' | |
# close the SMTPserver to ensure no channels connect to asyncore | |
self.smtp.close() | |
# now it is safe to wait for asyncore.loop() to exit | |
self.thread.join() | |
# check for emails in a non-blocking way | |
def get(self): | |
'''Return all emails received so far''' | |
return self.smtp.emails | |
if __name__ == '__main__': | |
server = CustomSMTPServer(('0.0.0.0', 1025), None) | |
asyncore.loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment