Skip to content

Instantly share code, notes, and snippets.

@danielrichman
Last active February 22, 2020 19:43
Show Gist options
  • Save danielrichman/aa27c1e948e0c1212d4c to your computer and use it in GitHub Desktop.
Save danielrichman/aa27c1e948e0c1212d4c to your computer and use it in GitHub Desktop.
templated mail merging with postgres & jinja2
query: SELECT * FROM people WHERE condition
sender:
name: Daniel Richman
email: main@danielrichman.co.uk
to_email_keys: [homeemail, workemail]
to_name_template: |
{{ firstname }} {{ surname }}
subject_template: Hello {{ firstname }}
body_template: |
Hello, {% include "to_name" %}
I am an autogenerated email.
#!/usr/bin/python
from __future__ import unicode_literals, print_function, division
import sys
import os
import email.mime.text
import email.utils
import email.generator
import smtplib
import argparse
import textwrap
import yaml
import jinja2
import jinja2.ext
import psycopg2
import psycopg2.extras
import psycopg2.extensions
class RewrapExtension(jinja2.ext.Extension):
tags = set(['rewrap'])
def parse(self, parser):
# first token is 'rewrap'
lineno = parser.stream.next().lineno
if parser.stream.current.type != 'block_end':
width = parser.parse_expression()
else:
width = jinja2.nodes.Const(78)
body = parser.parse_statements(['name:endrewrap'], drop_needle=True)
call = self.call_method('_rewrap', [width])
return jinja2.nodes.CallBlock(call, [], [], body).set_lineno(lineno)
def _rewrap(self, width, caller):
contents = caller()
lines = [line.strip() for line in contents.splitlines()]
lines.append('')
paragraphs = []
start = 0
while start != len(lines):
end = lines.index('', start)
if start != end:
paragraph = ' '.join(lines[start:end])
paragraphs.append(paragraph)
start = end + 1
new_lines = []
for paragraph in paragraphs:
if new_lines:
new_lines.append('')
new_lines += textwrap.wrap(paragraph, width)
# under the assumption that there will be a newline immediately after
# the endrewrap block, don't put a newline on the end.
return '\n'.join(new_lines)
class EmailTemplate(object):
def __init__(self, filename):
with open(filename) as f:
d = yaml.safe_load(f)
if not isinstance(d, dict):
raise TypeError("{0} did not contain a dictionary".format(filename))
self.query = d["query"]
self.to_email_keys = d["to_email_keys"]
sender = d["sender"]
self.sender = (sender["name"], sender["email"])
loader = jinja2.DictLoader({
"to_name": d["to_name_template"].strip(),
"body": d["body_template"].strip(),
"subject": d["subject_template"].strip()
})
jinja_env = jinja2.Environment(
undefined=jinja2.StrictUndefined,
loader=loader,
extensions=['jinja2.ext.with_', RewrapExtension]
)
self.to_name_template = jinja_env.get_template("to_name")
self.body_template = jinja_env.get_template("body")
self.subject_template = jinja_env.get_template("subject")
def run_query(self):
conn = psycopg2.connect(database="dbname")
try:
conn.set_client_encoding('UTF8')
for type in (psycopg2.extensions.UNICODE, psycopg2.extensions.UNICODEARRAY):
psycopg2.extensions.register_type(type, conn)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(self.query)
for row in cur:
yield row
finally:
conn.close()
def row_recipient_info(self, row):
to_name = self.to_name_template.render(**row)
to_emails = [row[k] for k in self.to_email_keys if row[k]]
if not to_emails:
raise Exception("All emails were empty?", row)
return (to_name, to_emails)
def row_render(self, row):
row = row.copy()
(to_name, to_emails) = self.row_recipient_info(row)
row["to_name"] = to_name
subject = self.subject_template.render(**row)
row["subject"] = subject
body = self.body_template.render(**row)
message = email.mime.text.MIMEText(unicode(body), _charset='utf-8')
message["From"] = email.utils.formataddr(self.sender)
message["To"] = ", ".join(email.utils.formataddr((to_name, e)) for e in to_emails)
message["Subject"] = subject
return (to_emails, message)
def row_send(self, row, override_recipients=None):
to_emails, message = self.row_render(row)
s = smtplib.SMTP('localhost')
if override_recipients:
to_emails = override_recipients
s.sendmail(self.sender[1], to_emails, message.as_string())
s.quit()
def argparser():
argparser = argparse.ArgumentParser(prog='mailmerge')
argparser.add_argument('template_name', help="template filename", metavar="mymail.yaml")
subparsers = argparser.add_subparsers(help='action')
send_all = subparsers.add_parser('send-all')
send_all.add_argument("--confirm", action="store_true")
send_all.set_defaults(action="send_all")
test_recipients = subparsers.add_parser('test-recipients')
test_recipients.set_defaults(action="test_recipients")
test_to_me = subparsers.add_parser('test-to-me')
test_to_me.add_argument("search_key", metavar="mid")
test_to_me.add_argument("search_value", metavar="123")
test_to_me.set_defaults(action="test_to_me")
test_display = subparsers.add_parser('test-display')
test_display.add_argument("search_key", metavar="mid")
test_display.add_argument("search_value", metavar="123")
test_display.set_defaults(action="test_display")
return argparser
def main(args):
template = EmailTemplate(args.template_name)
if args.action == "test_recipients":
for row in template.run_query():
to_name, to_emails = template.row_recipient_info(row)
print(to_name.ljust(30), *to_emails)
elif args.action == "test_display":
generator = email.generator.DecodedGenerator(sys.stdout, False)
for row in template.run_query():
if str(row[args.search_key]) == args.search_value:
to_emails, message = template.row_render(row)
print("RCPT TO:", *to_emails)
generator.flatten(message)
elif args.action == "test_to_me":
recipient = os.getlogin() + '@localhost'
count = 0
for row in template.run_query():
if str(row[args.search_key]) == args.search_value:
template.row_send(row, override_recipients=[recipient])
count += 1
print("Sent", count, "emails")
elif args.action == "send_all" and not args.confirm:
print("Are you really really sure? Provide --confirm to continue.")
elif args.action == "send_all" and args.confirm:
count = 0
try:
for row in template.run_query():
template.row_send(row)
count += 1
finally:
print("Sent", count, "emails")
else:
assert False
if __name__ == "__main__":
main(argparser().parse_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment