Created
June 26, 2012 15:06
-
-
Save akhenakh/2996326 to your computer and use it in GitHub Desktop.
Async amazon ses with gevent and pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 2012 Fabrice Aneche. https://gist.github.com/2996326 | |
# Gevent rewrite with requests | |
# | |
# Copyright 2011 The greplin-tornado-ses Authors. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# Usage ses = AmazonSes('SES_KEY', 'SES_SECRET') | |
# ses.start() | |
# | |
# ses.send_mail(subject='Change your password', | |
# body='Hello dear, change your pass', | |
# source='notification@ttewe.fr', | |
# to_addresses='akh@bidule.zp') | |
# | |
# Note that it should be used inside a loop like the gevent.wsgi server. | |
"""API for Amazon SES""" | |
from gevent import monkey; monkey.patch_all() | |
import hmac | |
import hashlib | |
import base64 | |
import requests | |
from datetime import datetime | |
import gevent | |
from gevent.queue import Queue, Empty | |
from gevent.pool import Pool | |
import collections | |
SESMessage = collections.namedtuple('SESMessage', 'headers params') | |
class AmazonSes(gevent.Greenlet): | |
"""Amazon SES object""" | |
BASE_URL = 'https://email.us-east-1.amazonaws.com' | |
def __init__(self, access_key, secret_id, pool_size=5): | |
gevent.Greenlet.__init__(self) | |
self._access_key = access_key | |
self._secret_id = secret_id | |
self._queue = Queue() | |
self._pool = Pool(pool_size) | |
def _run(self): | |
while True: | |
try: | |
task = self._queue.get(timeout=1) | |
if self._pool.full(): | |
self._queue.put(task) | |
gevent.sleep(1) | |
else: | |
self._pool.spawn(self._send, task) | |
except Empty: | |
pass | |
def on_error(self, msg, info): | |
print 'Failed with reason', info, msg.params | |
def _send(self, msg): | |
r = None | |
with gevent.Timeout(5, False): | |
r = requests.post(self.BASE_URL, headers=msg.headers, params=msg.params) | |
if r is None: | |
return self.on_error(msg, 'Timeout') | |
if r.status_code == 200: | |
return | |
self.on_error(msg, r.content) | |
def _sign(self, message): | |
"""Sign an AWS request""" | |
signed_hash = hmac.new(key=self._secret_id, msg=message, digestmod=hashlib.sha256) | |
return base64.b64encode(signed_hash.digest()).decode() | |
def send_mail(self, source, subject, body, to_addresses, | |
cc_addresses=None, bcc_addresses=None, email_format='text', | |
reply_addresses=None, return_path=None): | |
"""Composes an email and sends it""" | |
known_formats = { | |
'html':'Message.Body.Html.Data', | |
'text':'Message.Body.Text.Data' | |
} | |
singular = { | |
'Source': source, | |
'Message.Subject.Data':subject | |
} | |
if email_format not in known_formats: | |
raise ValueError("Format must be either 'text' or 'html'") | |
singular[known_formats[email_format]] = body | |
if return_path: | |
singular['ReturnPath'] = return_path | |
multiple = AwsMultipleParameterContainer() | |
multiple['Destination.ToAddresses.member'] = to_addresses | |
if cc_addresses: | |
multiple['Destination.CcAddresses.member'] = cc_addresses | |
if bcc_addresses: | |
multiple['Destination.BccAddresses.member'] = bcc_addresses | |
if reply_addresses: | |
multiple['ReplyToAddresses.member'] = reply_addresses | |
params = dict(singular, **multiple) | |
params['Action'] = 'SendEmail' | |
now = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') | |
headers = { | |
'Content-Type':'application/x-www-form-urlencoded', | |
'Date':now, | |
'X-Amzn-Authorization':'AWS3-HTTPS AWSAccessKeyId=%s, Algorithm=HMACSHA256, Signature=%s' % | |
(self._access_key, self._sign(now)) | |
} | |
msg = SESMessage(headers=headers, params=params) | |
self._queue.put(msg) | |
class AwsMultipleParameterContainer(dict): | |
"""Build a parameters list as required by Amazon""" | |
def __setitem__(self, key, value): | |
if isinstance(value, basestring): | |
value = [value] | |
for i in range(1, len(value) + 1): | |
dict.__setitem__(self, '%s.%d' % (key, i), value[i - 1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment