Skip to content

Instantly share code, notes, and snippets.

@combatpoodle
Last active November 17, 2020 16:36
Show Gist options
  • Save combatpoodle/2828641573a957c9c398 to your computer and use it in GitHub Desktop.
Save combatpoodle/2828641573a957c9c398 to your computer and use it in GitHub Desktop.
Boto SES + Python Twisted
# Overrides parts of Boto to provide async call/response. Derived from
# SESConnection and boto/connection.
# Copyright (c) 2010 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2011 Harry Marr http://hmarr.com/
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
import re
import urllib
import base64
from boto.connection import AWSAuthConnection, HTTPRequest, HTTPResponse
from boto.exception import BotoServerError
from boto.regioninfo import RegionInfo
import boto
import boto.jsonresponse
from boto.ses import exceptions as ses_exceptions
from boto.ses.connection import *
from urllib import urlencode
from pprint import pprint
from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, succeed
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
from zope.interface import implements
class StringProducer(object):
implements(IBodyProducer)
def __init__(self, body):
self.body = body
self.length = len(body)
def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass
class ResponseReceiver(Protocol):
def __init__(self, finished, callback, response, data):
self.finished = finished
self.body = ""
self.callback = callback
self.response = response
self.data = data
def dataReceived(self, bytes):
self.body += bytes
def connectionLost(self, reason):
# print 'Finished receiving body:', reason.getErrorMessage()
self.callback(self.response, self.body, self.data)
self.finished.callback(None)
class ASESConnection(SESConnection):
ResponseError = BotoServerError
DefaultRegionName = 'us-east-1'
DefaultRegionEndpoint = 'email.us-east-1.amazonaws.com'
# DefaultRegionEndpoint = 'coast.dev'
APIVersion = '2010-12-01'
host_override = None
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
proxy_user=None, proxy_pass=None, debug=0,
https_connection_factory=None, region=None, path='/',
security_token=None, validate_certs=True, agent=None, callback=None, errback=None, reactor=None):
if not region:
region = RegionInfo(self, self.DefaultRegionName,
self.DefaultRegionEndpoint)
self.region = region
SESConnection.__init__(self,
aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key,
is_secure=is_secure, port=port, proxy=proxy, proxy_port=proxy_port,
proxy_user=proxy_user, proxy_pass=proxy_pass, debug=debug,
https_connection_factory=None, path=path,
security_token=security_token,
validate_certs=validate_certs)
self.agent = agent
self.callback = callback
self.errback = errback
self.reactor = reactor
def _make_request(self, action, params=None):
"""Make a call to the SES API.
:type action: string
:param action: The API method to use (e.g. SendRawEmail)
:type params: dict
:param params: Parameters that will be sent as POST data with the API
call.
"""
ct = 'application/x-www-form-urlencoded; charset=UTF-8'
headers = {'Content-Type': ct}
params = params or {}
params['Action'] = action
for k, v in params.items():
if isinstance(v, unicode): # UTF-8 encode only if it's Unicode
params[k] = v.encode('utf-8')
self.make_request(
'POST',
'/',
headers=headers,
data=urllib.urlencode(params)
)
def make_request(self, method, path, headers=None, data='', host=None,
auth_path=None, sender=None, override_num_retries=None,
params=None):
"""Makes a request to the server, with stock multiple-retry logic."""
if params is None:
params = {}
if self.host_override != None:
host = self.host_override
self.http_request = self.build_base_http_request(method, path, auth_path,
params, headers, data, host)
self.http_request.authorize(connection=self)
bodyProducer = StringProducer(self.http_request.body)
url = self.http_request.protocol + '://' + self.http_request.host
url += self.http_request.path
# Twisted sets this again - doing it twice results in a 400 Bad Request...
del(self.http_request.headers['Content-Length'])
for (title, value) in self.http_request.headers.iteritems():
if type(value) == type("string"):
value = [ value ]
self.http_request.headers[title] = value
# print self.http_request.method, url, Headers(self.http_request.headers), self.http_request.body
d = self.agent.request(
self.http_request.method,
url,
Headers(self.http_request.headers),
bodyProducer=bodyProducer
)
d.addCallback(self.http_request_callback, data)
d.addErrback(self.http_request_errback, data)
def http_request_callback(self, response, data):
finished = Deferred()
response.deliverBody(ResponseReceiver(finished, self.http_body_received_callback, response, data))
return finished
def http_body_received_callback(self, response, body, data):
if response.code == 200:
list_markers = ('VerifiedEmailAddresses', 'Identities',
'VerificationAttributes', 'SendDataPoints')
item_markers = ('member', 'item', 'entry')
try:
e = boto.jsonresponse.Element(list_marker=list_markers,
item_marker=item_markers)
h = boto.jsonresponse.XmlHandler(e, None)
h.parse(body)
self.callback( e, data )
return
except Exception, e:
# print e
self.errback( self._handle_error(response, body), data )
return
else:
# HTTP codes other than 200 are considered errors. Go through
# some error handling to determine which exception gets raised,
self.errback( self._handle_error(response, body), data )
return
def http_request_errback(self, failure, data):
print failure, data
self.errback( "HTTP Error", failure )
def _handle_error(self, response, body):
"""
Handle raising the correct exception, depending on the error. Many
errors share the same HTTP response code, meaning we have to get really
kludgey and do string searches to figure out what went wrong.
"""
if "Address blacklisted." in body:
# Delivery failures happened frequently enough with the recipient's
# email address for Amazon to blacklist it. After a day or three,
# they'll be automatically removed, and delivery can be attempted
# again (if you write the code to do so in your application).
ExceptionToRaise = ses_exceptions.SESAddressBlacklistedError
exc_reason = "Address blacklisted."
elif "Email address is not verified." in body:
# This error happens when the "Reply-To" value passed to
# send_email() hasn't been verified yet.
ExceptionToRaise = ses_exceptions.SESAddressNotVerifiedError
exc_reason = "Email address is not verified."
elif "Daily message quota exceeded." in body:
# Encountered when your account exceeds the maximum total number
# of emails per 24 hours.
ExceptionToRaise = ses_exceptions.SESDailyQuotaExceededError
exc_reason = "Daily message quota exceeded."
elif "Maximum sending rate exceeded." in body:
# Your account has sent above its allowed requests a second rate.
ExceptionToRaise = ses_exceptions.SESMaxSendingRateExceededError
exc_reason = "Maximum sending rate exceeded."
elif "Domain ends with dot." in body:
# Recipient address ends with a dot/period. This is invalid.
ExceptionToRaise = ses_exceptions.SESDomainEndsWithDotError
exc_reason = "Domain ends with dot."
elif "Local address contains control or whitespace" in body:
# I think this pertains to the recipient address.
ExceptionToRaise = ses_exceptions.SESLocalAddressCharacterError
exc_reason = "Local address contains control or whitespace."
elif "Illegal address" in body:
# A clearly mal-formed address.
ExceptionToRaise = ses_exceptions.SESIllegalAddressError
exc_reason = "Illegal address"
# The re.search is to distinguish from the
# SESAddressNotVerifiedError error above.
elif re.search('Identity.*is not verified', body):
ExceptionToRaise = ses_exceptions.SESIdentityNotVerifiedError
exc_reason = "Identity is not verified."
elif "ownership not confirmed" in body:
ExceptionToRaise = ses_exceptions.SESDomainNotConfirmedError
exc_reason = "Domain ownership is not confirmed."
else:
# This is either a common AWS error, or one that we don't devote
# its own exception to.
ExceptionToRaise = self.ResponseError
exc_reason = "unknown"
return ExceptionToRaise(response.code, exc_reason, body)
{
"database_host": "127.0.0.1",
"database_user": "sadlfkjasdlfkj",
"database_passwd": "asldkfjasdlkfj",
"database_db": "laskdjflaskdjf",
"rabbitmq_host": "127.0.0.1",
"rabbitmq_port": 5672,
"rabbitmq_vhost": "/",
"rabbitmq_username": "guest",
"rabbitmq_password": "guest",
"rabbitmq_queue": "queue",
"rabbitmq_exchange": "exchange",
"rabbitmq_routing_key": "routing_key",
"rabbitmq_rate_limit": 3
}
from aconnection import ASESConnection
from pprint import pprint
from twisted.enterprise import adbapi
from twisted.internet import reactor, defer
from twisted.internet.defer import Deferred
from twisted.internet.ssl import ClientContextFactory
from twisted.web.client import Agent, HTTPConnectionPool
from twisted_queue import *
import functools
import json
import MySQLdb, MySQLdb.cursors
import re
import sys
import time
import traceback
import twisted.internet.task
# This class is used to enable SSL... Seems to just extend getContext to
# accept extra arguments that HTTP might require.
class WebClientContextFactory(ClientContextFactory):
def getContext(self, hostname, port):
return ClientContextFactory.getContext(self)
class EmailSender:
def __init__(self, reactor, email_id):
contextFactory = WebClientContextFactory()
self.sent_count = 0
self.last_sent_count = 0
self.failed_count = 0
self.last_failed_count = 0
self.email_id = email_id
self.reactor = reactor
pool = HTTPConnectionPool(reactor, persistent=True)
pool.maxPersistentPerHost = 50
pool.cachedConnectionTimeout = 10
https_agent = Agent(reactor, contextFactory, pool=pool)
self.set_up_configuration()
self.db_pool = adbapi.ConnectionPool(
"MySQLdb",
host=self.configuration['database_host'],
user=self.configuration['database_user'],
passwd=self.configuration['database_passwd'],
db=self.configuration['database_db'],
cursorclass=MySQLdb.cursors.DictCursor,
)
# This probably should just get activated with a --live hook
self.ses_connection = ASESConnection(
callback=self.sending_email_succeeded,
errback=self.boto_errback,
reactor=reactor,
agent=https_agent
)
if 'production' in self.configuration and self.configuration['production'] == "I really really know what I'm doing here":
print "WARNING: RUNNING PRODUCTION SEND. HIDE YO KIDS, HIDE YO WIFE."
time.sleep(10)
else:
self.ses_connection.host_override = 'ses.v'
self.reactor.callWhenRunning(self.go)
def set_up_configuration(self):
try:
configuration = json.loads(open('config.json').read())
except:
print "Failed to load manual configuration; using default"
configuration = {}
default_configuration = {
"database_host": "127.0.0.1",
"database_user": "asldkfjasldkfj",
"database_passwd": "asldkfjasldkfj",
"database_db": "asldkfjasldkfjasdlfkj",
"rabbitmq_host": "127.0.0.1",
"rabbitmq_port": 5672,
"rabbitmq_vhost": "/",
"rabbitmq_username": "guest",
"rabbitmq_password": "guest",
"rabbitmq_queue": "huge_email_address_queue",
"rabbitmq_exchange": "huge_email_address_queue",
"rabbitmq_routing_key": "huge_email_address_queue",
"rabbitmq_rate_limit": 1,
"source_address": '"MOJO Themes" <noreply@mojo-themes.com>',
"reporting_interval": 1
}
self.configuration = default_configuration
for (k, v) in configuration.items():
self.configuration[k] = v
pprint({'Configuration': self.configuration})
def go(self):
self.load_email_content()
self.send_emails()
@defer.inlineCallbacks
def load_email_content(self):
templates = yield self.db_pool.runQuery(
"SELECT subject, body FROM email_content WHERE id=(%s)",
(self.email_id,)
)
self.email_templates = []
if not templates:
self.reactor.stop()
print "No emails loaded"
sys.exit(1)
try:
for template in templates:
self.load_template(template)
except Exception, e:
print e
self.reactor.stop()
sys.exit(1)
if not self.email_templates:
print "No templates loaded!"
self.reactor.stop()
sys.exit(1)
def load_template(self, template):
self.email_templates = [{
'subject': template['subject'],
'body': template['body']
}]
def get_template(self, **argv):
return self.email_templates[0]
def apply_template(self, template, receiver_info):
subject = template['subject']
body = template['body']
if 'host_account_id' in receiver_info:
host_account_id = receiver_info['host_account_id']
else:
host_account_id = "null"
user_id = receiver_info['user_id']
try:
# import pdb; pdb.set_trace()
body = body.replace('{{host_account_id}}', str(host_account_id))
body = body.replace('{{user_id}}', str(user_id))
except Exception, e:
print "failed", sys.exc_info()[2]
traceback.print_tb(sys.exc_info()[2])
raise e
return {
'subject': subject,
'content': body,
'source': self.configuration['source_address']
}
def send_emails(self):
stats_loop = twisted.internet.task.LoopingCall(self.print_stats)
stats_loop.start(self.configuration['reporting_interval'])
self.message_queue = twisted_queue_receiver(
callback=self.send_email,
errback=self.queue_errback,
host=self.configuration['rabbitmq_host'],
port=self.configuration['rabbitmq_port'],
vhost=self.configuration['rabbitmq_vhost'],
username=self.configuration['rabbitmq_username'],
password=self.configuration['rabbitmq_password'],
queue=self.configuration['rabbitmq_queue'],
exchange=self.configuration['rabbitmq_exchange'],
routing_key=self.configuration['rabbitmq_routing_key'],
rate_limit=self.configuration['rabbitmq_rate_limit'],
no_ack=False,
durable=True
)
def queue_errback(self, err):
print err
print "I should probably be killed because I am a bad kitty."
reactor.stop()
sys.exit(1)
@defer.inlineCallbacks
def send_email(self, channel, raw_message):
if not (channel or raw_message):
defer.returnValue(None)
try:
receiver_info = json.loads(raw_message.content.body)
except ValueError, e:
channel.basic_ack(delivery_tag=raw_message.delivery_tag)
print "failed parsing", raw_message.content.body
self.sending_email_failed(e, raw_message.content.body)
defer.returnValue(None)
try:
template = self.get_template(receiver_info=receiver_info)
except Exception, e:
channel.basic_ack(delivery_tag=raw_message.delivery_tag)
print "Missing email template:", receiver_info, e
self.sending_email_failed(e, raw_message.content.body)
defer.returnValue(None)
try:
templated_email = self.apply_template(template, receiver_info)
except ValueError, e:
channel.basic_ack(delivery_tag=raw_message.delivery_tag)
print "Couldn't template email:", receiver_info
self.sending_email_failed(e, raw_message.content.body)
defer.returnValue(None)
email_subject = templated_email['subject']
email_content = templated_email['content']
email_source = templated_email['source']
try:
self.ses_connection.send_email(
source=email_source, subject=email_subject, body=email_content,
format='html', to_addresses=[receiver_info['email']])
except Exception, e:
channel.basic_ack(delivery_tag=raw_message.delivery_tag)
print "Send_email exception:", e
print receiver_info['email']
self.sending_email_failed(e, raw_message.content.body)
defer.returnValue(None)
try:
yield channel.basic_ack(delivery_tag=raw_message.delivery_tag)
except Exception, e:
print "Couldn't ack message because ???", e
defer.returnValue(None)
def print_stats(self):
diff_sent_count = self.sent_count - self.last_sent_count
diff_failed_count = self.failed_count - self.last_failed_count
self.last_sent_count = self.sent_count
self.last_failed_count = self.failed_count
print
print "sent_count:", self.sent_count, "period rate:", str(diff_sent_count / self.configuration['reporting_interval']) + "/s"
print "failed_count:", self.failed_count, "period rate:", str(diff_failed_count / self.configuration['reporting_interval']) + "/s"
sys.stdout.flush()
def sending_email_succeeded(self, e, data):
# sys.stdout.write('.')
# Could stuff into a database here if desired
self.sent_count += 1
def boto_errback(self, data, error):
print "failed sending:", error, data
self.failed_count += 1
def sending_email_failed(self, e, data):
# Could stuff into a database or something else if desired
print "failed sending to", data, "because", e
self.failed_count += 1
if __name__ == '__main__':
sender = EmailSender(reactor, sys.argv[1])
reactor.run()
# This adds the cloudfront servers' ip addresses into Memcache.
# Memcache Key: cloudfront_server_addresses
# Format:
# {
# 'subnet': ['server', 'server', 'server', 'server', ...],
# ...
# }
import sys
from pprint import pprint
import time
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from twisted.python import log
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
import txamqp.spec
class twisted_queue_receiver:
def __init__(self, callback, errback, host, port, vhost, username, password, queue, exchange, routing_key, durable=False, rate_limit=5, prefetch_count=500, no_ack=False):
import sys
spec = "amqp0-8.stripped.rabbitmq.xml"
self.queue = queue
self.exchange = exchange
self.routing_key = routing_key
self.callback = callback
self.durable = durable
self.rate_limit = rate_limit
self.min_interval = 1.0 / float(self.rate_limit)
self.last_time_called = time.time()
self.prefetch_count = prefetch_count
self.no_ack = no_ack
spec = txamqp.spec.load(spec)
delegate = TwistedDelegate()
d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost, spec=spec).connectTCP(host, port)
d.addCallback(self.gotConnection, username, password)
d.addErrback(errback)
@inlineCallbacks
def gotConnection(self, conn, username, password):
print "Connected to broker."
yield conn.authenticate(username, password)
print "Authenticated. Ready to receive messages"
self.channel = yield conn.channel(1)
yield self.channel.channel_open()
yield self.channel.basic_qos(prefetch_count=self.prefetch_count)
yield self.channel.queue_declare(queue=self.queue, durable=self.durable, exclusive=False, auto_delete=False)
yield self.channel.exchange_declare(exchange=self.exchange, type="direct", durable=self.durable, auto_delete=False)
yield self.channel.queue_bind(queue=self.queue, exchange=self.exchange, routing_key=self.routing_key)
yield self.channel.basic_consume(queue=self.queue, no_ack=self.no_ack, consumer_tag=self.routing_key)
self.queue = yield conn.queue(self.routing_key)
self.getMessages()
@inlineCallbacks
def getMessages(self):
"""
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.time() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.time()
return ret
return rateLimitedFunction
return decorate
"""
elapsed = time.time() - self.last_time_called
left_to_wait = self.min_interval - elapsed
if left_to_wait > 0:
yield reactor.callLater(left_to_wait, self.getMessages)
else:
self.last_time_called = time.time()
message = yield self.queue.get()
self.callback(self.channel, message)
elapsed = time.time() - self.last_time_called
left_to_wait = self.min_interval - elapsed
if left_to_wait < 0:
left_to_wait = 0
#print "left_to_wait: ", left_to_wait
yield reactor.callLater(left_to_wait*1.01, self.getMessages)
class twisted_queue_sender:
@inlineCallbacks
def gotConnection(self, conn, username, password):
print "Connected to broker."
yield conn.authenticate(username, password)
print "Authenticated. Ready to send messages"
self.channel = yield conn.channel(1)
yield self.channel.channel_open()
yield self.channel.queue_declare(queue=self.queue, durable=self.durable, exclusive=False, auto_delete=False)
yield self.channel.exchange_declare(exchange=self.exchange, type="direct", durable=self.durable, auto_delete=False)
yield self.channel.queue_bind(queue=self.queue, exchange=self.exchange, routing_key=self.routing_key)
yield self.callback(self.channel)
yield self.channel.channel_close()
#channel0 = yield conn.channel(0)
#yield channel0.channel_close()
#reactor.stop()
@inlineCallbacks
def put(self, msg):
yield self.queue.put(msg)
def __init__(self, callback, host, port, vhost, username, password, queue, exchange, routing_key, durable=False):
import sys
spec = "amqp0-8.stripped.rabbitmq.xml"
self.exchange = exchange
self.routing_key = routing_key
self.durable = durable
self.callback = callback
self.queue = queue
spec = txamqp.spec.load(spec)
delegate = TwistedDelegate()
d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost, spec=spec).connectTCP(host, port)
d.addCallback(self.gotConnection, username, password)
def whoops(err):
if reactor.running:
log.err(err)
reactor.stop()
d.addErrback(whoops)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment