Skip to content

Instantly share code, notes, and snippets.

@bswck
Last active November 11, 2022 20:21
Show Gist options
  • Save bswck/c06520f8acb307d995b89064f1f46c04 to your computer and use it in GitHub Desktop.
Save bswck/c06520f8acb307d995b89064f1f46c04 to your computer and use it in GitHub Desktop.
Minimal, persistent and multi-threaded e-mail delivery system in Flask
"""
Remember to set the SMTP_HOST environment variable
or pass it as a keyword argument to EmailWorker()!
Requirements:
pip install flask-mail persist-queue
Usage:
# To deliver an e-mail
from flask_mail import Message
from emailing import EmailWorker
john_doe_emails = EmailWorker(
"john.doe@example.com",
sender_name="John Doe",
default_subject="Message from John Doe!",
storage_path=".john_doe_emails",
password="changeit"
)
# Method 1:
message = Message(
recipients=["jack.kilby@ti.com"],
html="Hello, Jack Kilby! Check out my Flask website! https://mywebsite.com/"
)
# To note on the disk to deliver message to Jack
john_doe_emails.deliver(message)
# Method 2:
john_doe_emails.deliver_message(
recipients=["jack.kilby@ti.com"],
html="Hello, Jack Kilby! Check out my Flask website! https://mywebsite.com/"
)
# To start all EmailWorker instances,
# including john_doe_emails worker thread that will deliver the message
EmailWorker.start_all_workers(app)
# To check e-mail delivery status
email_id = john_doe_emails.deliver(message)
was_sent = john_doe_emails.was_sent(email_id)
was_error = john_doe_emails.was_failed(email_id)
# To stop all email workers
EmailWorker.stop_all_workers(app)
# Notes:
# You can subclass EmailWorker to customize behavior.
# To start all threads (instances) of the subclass,
# call .start_all_workers(app) of that subclass, not EmailWorker.
# Similarly when stopping.
"""
import contextlib
import os
import threading
import time
import traceback
import weakref
import flask
from flask_mail import Mail, Message
from persistqueue.sqlackqueue import SQLiteAckQueue, AckStatus
class EmailWorker(threading.Thread):
workers = weakref.WeakSet()
context = contextlib.nullcontext()
def __init__(
self,
email_address: str,
*,
sender_name: str | None = None,
default_subject: str = "Message",
storage_path: str = ".email_cache",
password: str | None = None,
smtp_host: str | None = None,
smtp_port: int = 25,
interval: int = 1,
):
super().__init__()
self.email_address = email_address
self.default_subject = default_subject
self.password = password
self.interval = interval
self.sender_name = sender_name or email_address
self.smtp_host = smtp_host or SMTP_HOST
self.smtp_port = smtp_port or SMTP_PORT
self.queue = SQLiteAckQueue(storage_path, multithreading=True)
self.client = Mail()
self._is_working = False
type(self).workers.add(self)
def __init_subclass__(cls):
cls.workers = weakref.WeakSet()
def stop(self):
self._is_working = False
def setup(self, app: flask.Flask):
app.config.update(
MAIL_SERVER=self.smtp_host,
MAIL_PORT=self.smtp_port,
MAIL_USERNAME=self.email_address,
MAIL_PASSWORD=self.password,
)
self.context = app.app_context()
self.client.init_app(app)
def error(self, item: dict):
traceback.print_exc()
self.queue.ack_failed(id=item["pqid"])
def send(self, message: Message):
if not message.sender:
message.sender = f"{self.sender_name} <{self.email_address}>"
if not message.subject:
message.subject = self.default_subject
self.client.send(message)
def deliver(self, message: Message):
return self.queue.put(message)
def deliver_message(self, *args, message_cls: type = Message, **kwargs):
return self.deliver(message_cls(*args, **kwargs))
def fetch_item(self, email_id: int):
rows = list(filter(lambda item: item["id"] == email_id, self.queue.queue()))
row = {}
if rows:
row = rows[0]
return row
def get_status(self, email_id):
return self.fetch_item(email_id).get("status", AckStatus.unack)
def was_sent(self, email_id):
return self.get_status(email_id) == AckStatus.acked
def was_error(self, email_id):
return self.get_status(email_id) == AckStatus.ack_failed
def run(self):
self._is_working = True
while self._is_working:
item = self.queue.get(raw=True)
if not item:
continue
item_id = item["pqid"]
message = item["data"]
try:
with self.context:
self.send(message)
except Exception:
self.error(item)
else:
self.queue.ack(id=item_id)
time.sleep(self.interval)
@classmethod
def start_all_workers(cls, app):
workers = set(cls.workers)
for worker in workers:
if not worker.is_alive():
worker.setup(app)
worker.start()
return workers
@classmethod
def stop_all_workers(cls):
workers = set(cls.workers)
for worker in workers:
worker.stop()
worker.join()
SMTP_HOST = os.getenv("SMTP_HOST")
SMTP_PORT = os.getenv("SMTP_PORT", 25)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment