Last active
November 11, 2022 20:21
-
-
Save bswck/c06520f8acb307d995b89064f1f46c04 to your computer and use it in GitHub Desktop.
Minimal, persistent and multi-threaded e-mail delivery system in Flask
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Remember to set the SMTP_HOST environment variable | |
or pass it as a keyword argument to EmailWorker()! | |
Requirements: | |
pip install flask-mail persist-queue | |
Usage: | |
# To deliver an e-mail | |
from flask_mail import Message | |
from emailing import EmailWorker | |
john_doe_emails = EmailWorker( | |
"john.doe@example.com", | |
sender_name="John Doe", | |
default_subject="Message from John Doe!", | |
storage_path=".john_doe_emails", | |
password="changeit" | |
) | |
# Method 1: | |
message = Message( | |
recipients=["jack.kilby@ti.com"], | |
html="Hello, Jack Kilby! Check out my Flask website! https://mywebsite.com/" | |
) | |
# To note on the disk to deliver message to Jack | |
john_doe_emails.deliver(message) | |
# Method 2: | |
john_doe_emails.deliver_message( | |
recipients=["jack.kilby@ti.com"], | |
html="Hello, Jack Kilby! Check out my Flask website! https://mywebsite.com/" | |
) | |
# To start all EmailWorker instances, | |
# including john_doe_emails worker thread that will deliver the message | |
EmailWorker.start_all_workers(app) | |
# To check e-mail delivery status | |
email_id = john_doe_emails.deliver(message) | |
was_sent = john_doe_emails.was_sent(email_id) | |
was_error = john_doe_emails.was_failed(email_id) | |
# To stop all email workers | |
EmailWorker.stop_all_workers(app) | |
# Notes: | |
# You can subclass EmailWorker to customize behavior. | |
# To start all threads (instances) of the subclass, | |
# call .start_all_workers(app) of that subclass, not EmailWorker. | |
# Similarly when stopping. | |
""" | |
import contextlib | |
import os | |
import threading | |
import time | |
import traceback | |
import weakref | |
import flask | |
from flask_mail import Mail, Message | |
from persistqueue.sqlackqueue import SQLiteAckQueue, AckStatus | |
class EmailWorker(threading.Thread): | |
workers = weakref.WeakSet() | |
context = contextlib.nullcontext() | |
def __init__( | |
self, | |
email_address: str, | |
*, | |
sender_name: str | None = None, | |
default_subject: str = "Message", | |
storage_path: str = ".email_cache", | |
password: str | None = None, | |
smtp_host: str | None = None, | |
smtp_port: int = 25, | |
interval: int = 1, | |
): | |
super().__init__() | |
self.email_address = email_address | |
self.default_subject = default_subject | |
self.password = password | |
self.interval = interval | |
self.sender_name = sender_name or email_address | |
self.smtp_host = smtp_host or SMTP_HOST | |
self.smtp_port = smtp_port or SMTP_PORT | |
self.queue = SQLiteAckQueue(storage_path, multithreading=True) | |
self.client = Mail() | |
self._is_working = False | |
type(self).workers.add(self) | |
def __init_subclass__(cls): | |
cls.workers = weakref.WeakSet() | |
def stop(self): | |
self._is_working = False | |
def setup(self, app: flask.Flask): | |
app.config.update( | |
MAIL_SERVER=self.smtp_host, | |
MAIL_PORT=self.smtp_port, | |
MAIL_USERNAME=self.email_address, | |
MAIL_PASSWORD=self.password, | |
) | |
self.context = app.app_context() | |
self.client.init_app(app) | |
def error(self, item: dict): | |
traceback.print_exc() | |
self.queue.ack_failed(id=item["pqid"]) | |
def send(self, message: Message): | |
if not message.sender: | |
message.sender = f"{self.sender_name} <{self.email_address}>" | |
if not message.subject: | |
message.subject = self.default_subject | |
self.client.send(message) | |
def deliver(self, message: Message): | |
return self.queue.put(message) | |
def deliver_message(self, *args, message_cls: type = Message, **kwargs): | |
return self.deliver(message_cls(*args, **kwargs)) | |
def fetch_item(self, email_id: int): | |
rows = list(filter(lambda item: item["id"] == email_id, self.queue.queue())) | |
row = {} | |
if rows: | |
row = rows[0] | |
return row | |
def get_status(self, email_id): | |
return self.fetch_item(email_id).get("status", AckStatus.unack) | |
def was_sent(self, email_id): | |
return self.get_status(email_id) == AckStatus.acked | |
def was_error(self, email_id): | |
return self.get_status(email_id) == AckStatus.ack_failed | |
def run(self): | |
self._is_working = True | |
while self._is_working: | |
item = self.queue.get(raw=True) | |
if not item: | |
continue | |
item_id = item["pqid"] | |
message = item["data"] | |
try: | |
with self.context: | |
self.send(message) | |
except Exception: | |
self.error(item) | |
else: | |
self.queue.ack(id=item_id) | |
time.sleep(self.interval) | |
@classmethod | |
def start_all_workers(cls, app): | |
workers = set(cls.workers) | |
for worker in workers: | |
if not worker.is_alive(): | |
worker.setup(app) | |
worker.start() | |
return workers | |
@classmethod | |
def stop_all_workers(cls): | |
workers = set(cls.workers) | |
for worker in workers: | |
worker.stop() | |
worker.join() | |
SMTP_HOST = os.getenv("SMTP_HOST") | |
SMTP_PORT = os.getenv("SMTP_PORT", 25) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment