Skip to content

Instantly share code, notes, and snippets.

@UNIcodeX
Last active December 1, 2019 22:08
Show Gist options
  • Save UNIcodeX/a2b804cda9500d1f3bdddba6921dbd3c to your computer and use it in GitHub Desktop.
Save UNIcodeX/a2b804cda9500d1f3bdddba6921dbd3c to your computer and use it in GitHub Desktop.
An example of how to perform asynchronous work within a synchronous program, this example using Flask. Code adapted from "Example #1" at https://hackernoon.com/threaded-asynchronous-magic-and-how-to-wield-it-bba9ed602c32
import asyncio
import smtplib
from threading import Thread
from flask import Flask
from email.mime.text import MIMEText
app = Flask(__name__)
def send_notification(sender, recipient, subject, body, server):
"""Generate and send the notification email"""
# Do some work to get email body
message = MIMEText(body, 'html')
message['Subject'] = subject
message['From'] = sender
message['To'] = recipient
# Connect to the server
server = smtplib.SMTP(server)
# Send the email
server.sendmail(sender,
recipient,
message.as_string())
server.quit()
def start_email_worker(loop):
"""Switch to new event loop and run forever"""
asyncio.set_event_loop(loop)
loop.run_forever()
# Create the new loop and worker thread
worker_loop = asyncio.new_event_loop()
worker = Thread(target=start_email_worker, args=(worker_loop,))
# Start the thread
worker.start()
# Assume a Flask restful interface endpoint
@app.route("/notify/<email>")
def notify(email):
"""Request notification email"""
worker_loop.call_soon_threadsafe(send_notification,
'{address_from}',
'{address_rcpt}',
'{message_subj}',
'{message_body}',
'{email_server}',
)
return 'Sent notification to ' + str(email)
if __name__ == '__main__':
app.run('localhost', port=5000, debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment