Skip to content

Instantly share code, notes, and snippets.

@akhenakh
Created June 26, 2012 15:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akhenakh/2996326 to your computer and use it in GitHub Desktop.
Save akhenakh/2996326 to your computer and use it in GitHub Desktop.
Async amazon ses with gevent and pool
# 2012 Fabrice Aneche. https://gist.github.com/2996326
# Gevent rewrite with requests
#
# Copyright 2011 The greplin-tornado-ses Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Usage ses = AmazonSes('SES_KEY', 'SES_SECRET')
# ses.start()
#
# ses.send_mail(subject='Change your password',
# body='Hello dear, change your pass',
# source='notification@ttewe.fr',
# to_addresses='akh@bidule.zp')
#
# Note that it should be used inside a loop like the gevent.wsgi server.
"""API for Amazon SES"""
from gevent import monkey; monkey.patch_all()
import hmac
import hashlib
import base64
import requests
from datetime import datetime
import gevent
from gevent.queue import Queue, Empty
from gevent.pool import Pool
import collections
SESMessage = collections.namedtuple('SESMessage', 'headers params')
class AmazonSes(gevent.Greenlet):
"""Amazon SES object"""
BASE_URL = 'https://email.us-east-1.amazonaws.com'
def __init__(self, access_key, secret_id, pool_size=5):
gevent.Greenlet.__init__(self)
self._access_key = access_key
self._secret_id = secret_id
self._queue = Queue()
self._pool = Pool(pool_size)
def _run(self):
while True:
try:
task = self._queue.get(timeout=1)
if self._pool.full():
self._queue.put(task)
gevent.sleep(1)
else:
self._pool.spawn(self._send, task)
except Empty:
pass
def on_error(self, msg, info):
print 'Failed with reason', info, msg.params
def _send(self, msg):
r = None
with gevent.Timeout(5, False):
r = requests.post(self.BASE_URL, headers=msg.headers, params=msg.params)
if r is None:
return self.on_error(msg, 'Timeout')
if r.status_code == 200:
return
self.on_error(msg, r.content)
def _sign(self, message):
"""Sign an AWS request"""
signed_hash = hmac.new(key=self._secret_id, msg=message, digestmod=hashlib.sha256)
return base64.b64encode(signed_hash.digest()).decode()
def send_mail(self, source, subject, body, to_addresses,
cc_addresses=None, bcc_addresses=None, email_format='text',
reply_addresses=None, return_path=None):
"""Composes an email and sends it"""
known_formats = {
'html':'Message.Body.Html.Data',
'text':'Message.Body.Text.Data'
}
singular = {
'Source': source,
'Message.Subject.Data':subject
}
if email_format not in known_formats:
raise ValueError("Format must be either 'text' or 'html'")
singular[known_formats[email_format]] = body
if return_path:
singular['ReturnPath'] = return_path
multiple = AwsMultipleParameterContainer()
multiple['Destination.ToAddresses.member'] = to_addresses
if cc_addresses:
multiple['Destination.CcAddresses.member'] = cc_addresses
if bcc_addresses:
multiple['Destination.BccAddresses.member'] = bcc_addresses
if reply_addresses:
multiple['ReplyToAddresses.member'] = reply_addresses
params = dict(singular, **multiple)
params['Action'] = 'SendEmail'
now = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
headers = {
'Content-Type':'application/x-www-form-urlencoded',
'Date':now,
'X-Amzn-Authorization':'AWS3-HTTPS AWSAccessKeyId=%s, Algorithm=HMACSHA256, Signature=%s' %
(self._access_key, self._sign(now))
}
msg = SESMessage(headers=headers, params=params)
self._queue.put(msg)
class AwsMultipleParameterContainer(dict):
"""Build a parameters list as required by Amazon"""
def __setitem__(self, key, value):
if isinstance(value, basestring):
value = [value]
for i in range(1, len(value) + 1):
dict.__setitem__(self, '%s.%d' % (key, i), value[i - 1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment