Skip to content

Instantly share code, notes, and snippets.

@aowongster
Forked from jriguera/email_notify.py
Last active January 24, 2017 02:31
Show Gist options
  • Save aowongster/34d07a7dce1f7e61d9ca6a856fad361e to your computer and use it in GitHub Desktop.
Save aowongster/34d07a7dce1f7e61d9ca6a856fad361e to your computer and use it in GitHub Desktop.
Email from Python with Jinja2
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Python 3 and compatibility with Python 2
from __future__ import unicode_literals, print_function
import os
import sys
import re
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from jinja2 import Environment, FileSystemLoader
class EmailNotification(object):
EMAIL_REGEX = re.compile('([\w\-\.\']+@(\w[\w\-]+\.)+[\w\-]+)')
HTML_REGEX = re.compile('(^<!DOCTYPE html.*?>)')
def __init__(self, smtp, fromuser, fromemail, login=None,
password=None, templatedir='templates', logger=None):
self.logger = logger
if not logger:
logging.basicConfig()
self.logger = logging.getLogger(__name__)
self.smtp = smtp
self.mfrom = "%s <%s>" % (fromuser, fromemail)
self.reply = fromemail
self.smtplogin = login
self.smtppass = password
if os.path.isdir(templatedir):
self.templatedir = templatedir
else:
self.templatedir = os.path.join(os.path.abspath(os.path.dirname(__file__)), templatedir)
self.env = Environment(loader=FileSystemLoader(self.templatedir))
def _mailrender(self, data, template):
template = template + ".tmpl"
self.logger.debug("Rendering template '%s'" % (template))
text = self.env.get_template(template)
msg = text.render(data)
return msg
def _smtpconnect(self):
try:
smtp = smtplib.SMTP(self.smtp)
except Exception as e:
self.logger.error("Cannot connect with '%s': %s" % (self.smtp, e))
raise
if self.smtplogin:
try:
smtp.login(self.smtplogin, self.smtppass)
except smtplib.SMTPException as e:
self.logger.error("Cannot auth with '%s' on %s: %s" % (self.smtplogin, self.smtp, e))
raise
finally:
smtp.quit()
return smtp
def _smtpsend(self, smtp, recipient, subject, content):
if self.HTML_REGEX.match(content) is None:
self.logger.debug("Sending text mail to '%s'" % (recipient))
msg = MIMEText(content)
else:
self.logger.debug("Sending html mail to '%s'" % (recipient))
msg = MIMEMultipart('alternative')
msg.attach(MIMEText(content, 'html', 'utf-8'))
msg['From'] = self.mfrom
msg['To'] = recipient
msg['Reply-to'] = self.reply
msg['Subject'] = subject
smtp.sendmail(self.mfrom, [recipient], msg.as_string())
def send_email(self, recipient, subject, msg):
smtp = self._smtpconnect()
try:
self._smtpsend(smtp, recipient, subject, msg)
except smtplib.SMTPException as e:
self.logger.error("Cannot send mail to '%s': %s" % (recipient, e))
raise
finally:
smtp.quit()
def send_bulk(self, msgs):
smtp = self._smtpconnect()
processed = 0
for (recipient, subject, msg) in msgs:
try:
self._smtpsend(smtp, recipient, subject, msg)
except smtplib.SMTPException as e:
self.logger.error("Cannot send mail to '%s': %s" % (recipient, e))
else:
processed += 1
smtp.quit()
return processed
def mailout(self, email, name, subject, data, template):
if email is None:
error = "Email is empty!"
self.logger.error(error)
raise ValueError(error)
elif self.EMAIL_REGEX.match(email) is None:
error = "Invalid email address!"
self.logger.error(error)
raise ValueError(error)
msg = self._mailrender(data, template)
self.send_email(email, subject, msg)
def mailbulk(self, email_data, template):
elist = []
for edata in email_data:
try:
email = edata["email"]
name = edata["name"]
subject = edata["subject"]
data = edata["data"]
except Exception as e:
continue
if email is None:
error = "Email is empty!"
self.logger.error(error)
continue
elif self.EMAIL_REGEX.match(email) is None:
error = "Invalid email address!"
self.logger.error(error)
continue
msg = self._mailrender(data, template)
elist.append((email, subject, msg))
return self.send_bulk(elist)
def main(argv):
e = EmailNotification("smtp.example.com", "Jose Riguera", "jriguera@gmail.com")
elist = [{
"email": "jriguera@gmail.com",
"name": "Simon",
"subject": "Hi",
"data" : {
"dear": "Simon",
"msg": "Hola mundo"
}
}]
e.mailbulk(elist, "email-html_notify")
if __name__ == "__main__":
main(sys.argv)
<!DOCTYPE html>
<html>
<head>
<title>OpenStack notification</title>
<style type="text/css">
span.bold {font-weight: bold;}
table.noborder {border: 0px; padding: 8px;}
th {text-align: left;}
</style>
</head>
<body>
<p>
Dear {{ dear }},
</p>
{% block content %}
{% endblock %}
<p>
Regards, <br />
<br />
The Platform Engineering OpenStack support team
</p>
<p>
-- <br />
This email has been sent to the email address associated with your OpenStack login.
They only relate to events that may affect your OpenStack resources. If you
receive multiple copies of this email, it may be due to the fact that you
are a member of multiple projects.
</p>
</body>
</html>
{% extends "email-html_base.tmpl" %}
{% block content %}
<p>
{{ msg }}
</p>
<p>
<b>AFFECTED INSTANCES:</b>
</p>
<table class='noborder'>
<tr>
<th>UUID</th><th>IP Address</th><th>Host</th>
</tr>
{% for instance in instances -%}
<tr>
<td>{{ instance.id }}</td><td>{{ instance.accessIPv4 }}</td><td>{{ instance.name }}</td>
</tr>
{% endfor %}
</table>
{% endblock %}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment