Skip to content

Instantly share code, notes, and snippets.

@ehershey

ehershey/-

Last active Dec 13, 2016
Embed
What would you like to do?
import boto3
from google.appengine.ext import testbed
import json
from google.appengine.api import mail
from google.appengine.api import urlfetch
testbed = testbed.Testbed()
testbed.activate()
testbed.init_datastore_v3_stub()
testbed.init_memcache_stub()
testbed.init_urlfetch_stub()
def _encode_safely(s):
"""Helper to turn a unicode string into 8-bit bytes."""
if isinstance(s, unicode):
s = s.encode('utf-8')
return s
def aws_execute(client, method, par):
# Perform an AWS method call by wrapping it in urlfetch.fetch()
# Direct boto3.client.method() calls times out as GAE blocks it
url = client.generate_presigned_url(method, Params=par, ExpiresIn=100, HttpMethod="GET")
hdrs = {"Accept": "application/json", } # request JSON results in body.
result = urlfetch.fetch(url, deadline=5, headers=hdrs)
if result.status_code // 100 != 2: # no 2xx from AWS?
logging.info("AWS %s FAIL: %s", method, str(result))
raise RuntimeError("AWS %s call failed with %d" % (method, result.status_code))
if result.content:
return json.loads(result.content)
return {}
my_email = 'ernie@mongodb.com'
to = ['ernie+rietveldrecip@mongodb.com']
subject = 'test subject'
body = "test small body / " * 1000
reply_to = [ 'ernie@mongodb.com' ]
cc = ['ernie+rietveldrecip2@mongodb.com']
patch = None
def main():
send_args = {'sender': my_email,
'to': [_encode_safely(address) for address in to],
'subject': _encode_safely(subject),
'body': _encode_safely(body),
'reply_to': _encode_safely(reply_to)}
if cc:
send_args['cc'] = [_encode_safely(address) for address in cc]
if patch:
send_args['attachments'] = [('issue_%s_patch.diff' % issue.key.id(),
patch)]
with open("secrets.json", "r") as f:
SECRETS = json.loads(f.read())
attempts = 0
while True:
try:
client = boto3.client('ses', 'us-east-1', aws_access_key_id=SECRETS['AWS_ACCESS_KEY_ID'], aws_secret_access_key=SECRETS['AWS_SECRET_ACCESS_KEY'])
par = {
'Source':my_email,
'Destination': {
'ToAddresses': send_args["to"],
},
"ReplyToAddresses": send_args["reply_to"],
'Message':{"Subject":{"Data":send_args["subject"]},
"Body":{
"Text":{"Data":send_args["body"].decode("utf-8"), "Charset":"UTF-8" },
}
}
}
if send_args.get('cc', None):
par["Destination"]["CcAddresses"] = send_args["cc"]
r = aws_execute(client, 'send_email', par)
break
except mail.InvalidSenderError:
if django_settings.RIETVELD_INCOMING_MAIL_ADDRESS:
previous_sender = send_args['sender']
if previous_sender not in send_args['to']:
send_args['to'].append(previous_sender)
send_args['sender'] = django_settings.RIETVELD_INCOMING_MAIL_ADDRESS
else:
raise
if attempts:
logging.warning("Retried sending email %s times", attempts)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment