Last active
November 17, 2020 16:36
-
-
Save combatpoodle/2828641573a957c9c398 to your computer and use it in GitHub Desktop.
Boto SES + Python Twisted
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Overrides parts of Boto to provide async call/response. Derived from | |
# SESConnection and boto/connection. | |
# Copyright (c) 2010 Mitch Garnaat http://garnaat.org/ | |
# Copyright (c) 2011 Harry Marr http://hmarr.com/ | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a | |
# copy of this software and associated documentation files (the | |
# "Software"), to deal in the Software without restriction, including | |
# without limitation the rights to use, copy, modify, merge, publish, dis- | |
# tribute, sublicense, and/or sell copies of the Software, and to permit | |
# persons to whom the Software is furnished to do so, subject to the fol- | |
# lowing conditions: | |
# | |
# The above copyright notice and this permission notice shall be included | |
# in all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
# IN THE SOFTWARE. | |
import re | |
import urllib | |
import base64 | |
from boto.connection import AWSAuthConnection, HTTPRequest, HTTPResponse | |
from boto.exception import BotoServerError | |
from boto.regioninfo import RegionInfo | |
import boto | |
import boto.jsonresponse | |
from boto.ses import exceptions as ses_exceptions | |
from boto.ses.connection import * | |
from urllib import urlencode | |
from pprint import pprint | |
from twisted.internet import reactor | |
from twisted.internet.defer import Deferred, DeferredList, succeed | |
from twisted.internet.protocol import Protocol | |
from twisted.web.client import Agent, HTTPConnectionPool | |
from twisted.web.http_headers import Headers | |
from twisted.web.iweb import IBodyProducer | |
from zope.interface import implements | |
class StringProducer(object): | |
implements(IBodyProducer) | |
def __init__(self, body): | |
self.body = body | |
self.length = len(body) | |
def startProducing(self, consumer): | |
consumer.write(self.body) | |
return succeed(None) | |
def pauseProducing(self): | |
pass | |
def stopProducing(self): | |
pass | |
class ResponseReceiver(Protocol): | |
def __init__(self, finished, callback, response, data): | |
self.finished = finished | |
self.body = "" | |
self.callback = callback | |
self.response = response | |
self.data = data | |
def dataReceived(self, bytes): | |
self.body += bytes | |
def connectionLost(self, reason): | |
# print 'Finished receiving body:', reason.getErrorMessage() | |
self.callback(self.response, self.body, self.data) | |
self.finished.callback(None) | |
class ASESConnection(SESConnection): | |
ResponseError = BotoServerError | |
DefaultRegionName = 'us-east-1' | |
DefaultRegionEndpoint = 'email.us-east-1.amazonaws.com' | |
# DefaultRegionEndpoint = 'coast.dev' | |
APIVersion = '2010-12-01' | |
host_override = None | |
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, | |
is_secure=True, port=None, proxy=None, proxy_port=None, | |
proxy_user=None, proxy_pass=None, debug=0, | |
https_connection_factory=None, region=None, path='/', | |
security_token=None, validate_certs=True, agent=None, callback=None, errback=None, reactor=None): | |
if not region: | |
region = RegionInfo(self, self.DefaultRegionName, | |
self.DefaultRegionEndpoint) | |
self.region = region | |
SESConnection.__init__(self, | |
aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, | |
is_secure=is_secure, port=port, proxy=proxy, proxy_port=proxy_port, | |
proxy_user=proxy_user, proxy_pass=proxy_pass, debug=debug, | |
https_connection_factory=None, path=path, | |
security_token=security_token, | |
validate_certs=validate_certs) | |
self.agent = agent | |
self.callback = callback | |
self.errback = errback | |
self.reactor = reactor | |
def _make_request(self, action, params=None): | |
"""Make a call to the SES API. | |
:type action: string | |
:param action: The API method to use (e.g. SendRawEmail) | |
:type params: dict | |
:param params: Parameters that will be sent as POST data with the API | |
call. | |
""" | |
ct = 'application/x-www-form-urlencoded; charset=UTF-8' | |
headers = {'Content-Type': ct} | |
params = params or {} | |
params['Action'] = action | |
for k, v in params.items(): | |
if isinstance(v, unicode): # UTF-8 encode only if it's Unicode | |
params[k] = v.encode('utf-8') | |
self.make_request( | |
'POST', | |
'/', | |
headers=headers, | |
data=urllib.urlencode(params) | |
) | |
def make_request(self, method, path, headers=None, data='', host=None, | |
auth_path=None, sender=None, override_num_retries=None, | |
params=None): | |
"""Makes a request to the server, with stock multiple-retry logic.""" | |
if params is None: | |
params = {} | |
if self.host_override != None: | |
host = self.host_override | |
self.http_request = self.build_base_http_request(method, path, auth_path, | |
params, headers, data, host) | |
self.http_request.authorize(connection=self) | |
bodyProducer = StringProducer(self.http_request.body) | |
url = self.http_request.protocol + '://' + self.http_request.host | |
url += self.http_request.path | |
# Twisted sets this again - doing it twice results in a 400 Bad Request... | |
del(self.http_request.headers['Content-Length']) | |
for (title, value) in self.http_request.headers.iteritems(): | |
if type(value) == type("string"): | |
value = [ value ] | |
self.http_request.headers[title] = value | |
# print self.http_request.method, url, Headers(self.http_request.headers), self.http_request.body | |
d = self.agent.request( | |
self.http_request.method, | |
url, | |
Headers(self.http_request.headers), | |
bodyProducer=bodyProducer | |
) | |
d.addCallback(self.http_request_callback, data) | |
d.addErrback(self.http_request_errback, data) | |
def http_request_callback(self, response, data): | |
finished = Deferred() | |
response.deliverBody(ResponseReceiver(finished, self.http_body_received_callback, response, data)) | |
return finished | |
def http_body_received_callback(self, response, body, data): | |
if response.code == 200: | |
list_markers = ('VerifiedEmailAddresses', 'Identities', | |
'VerificationAttributes', 'SendDataPoints') | |
item_markers = ('member', 'item', 'entry') | |
try: | |
e = boto.jsonresponse.Element(list_marker=list_markers, | |
item_marker=item_markers) | |
h = boto.jsonresponse.XmlHandler(e, None) | |
h.parse(body) | |
self.callback( e, data ) | |
return | |
except Exception, e: | |
# print e | |
self.errback( self._handle_error(response, body), data ) | |
return | |
else: | |
# HTTP codes other than 200 are considered errors. Go through | |
# some error handling to determine which exception gets raised, | |
self.errback( self._handle_error(response, body), data ) | |
return | |
def http_request_errback(self, failure, data): | |
print failure, data | |
self.errback( "HTTP Error", failure ) | |
def _handle_error(self, response, body): | |
""" | |
Handle raising the correct exception, depending on the error. Many | |
errors share the same HTTP response code, meaning we have to get really | |
kludgey and do string searches to figure out what went wrong. | |
""" | |
if "Address blacklisted." in body: | |
# Delivery failures happened frequently enough with the recipient's | |
# email address for Amazon to blacklist it. After a day or three, | |
# they'll be automatically removed, and delivery can be attempted | |
# again (if you write the code to do so in your application). | |
ExceptionToRaise = ses_exceptions.SESAddressBlacklistedError | |
exc_reason = "Address blacklisted." | |
elif "Email address is not verified." in body: | |
# This error happens when the "Reply-To" value passed to | |
# send_email() hasn't been verified yet. | |
ExceptionToRaise = ses_exceptions.SESAddressNotVerifiedError | |
exc_reason = "Email address is not verified." | |
elif "Daily message quota exceeded." in body: | |
# Encountered when your account exceeds the maximum total number | |
# of emails per 24 hours. | |
ExceptionToRaise = ses_exceptions.SESDailyQuotaExceededError | |
exc_reason = "Daily message quota exceeded." | |
elif "Maximum sending rate exceeded." in body: | |
# Your account has sent above its allowed requests a second rate. | |
ExceptionToRaise = ses_exceptions.SESMaxSendingRateExceededError | |
exc_reason = "Maximum sending rate exceeded." | |
elif "Domain ends with dot." in body: | |
# Recipient address ends with a dot/period. This is invalid. | |
ExceptionToRaise = ses_exceptions.SESDomainEndsWithDotError | |
exc_reason = "Domain ends with dot." | |
elif "Local address contains control or whitespace" in body: | |
# I think this pertains to the recipient address. | |
ExceptionToRaise = ses_exceptions.SESLocalAddressCharacterError | |
exc_reason = "Local address contains control or whitespace." | |
elif "Illegal address" in body: | |
# A clearly mal-formed address. | |
ExceptionToRaise = ses_exceptions.SESIllegalAddressError | |
exc_reason = "Illegal address" | |
# The re.search is to distinguish from the | |
# SESAddressNotVerifiedError error above. | |
elif re.search('Identity.*is not verified', body): | |
ExceptionToRaise = ses_exceptions.SESIdentityNotVerifiedError | |
exc_reason = "Identity is not verified." | |
elif "ownership not confirmed" in body: | |
ExceptionToRaise = ses_exceptions.SESDomainNotConfirmedError | |
exc_reason = "Domain ownership is not confirmed." | |
else: | |
# This is either a common AWS error, or one that we don't devote | |
# its own exception to. | |
ExceptionToRaise = self.ResponseError | |
exc_reason = "unknown" | |
return ExceptionToRaise(response.code, exc_reason, body) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"database_host": "127.0.0.1", | |
"database_user": "sadlfkjasdlfkj", | |
"database_passwd": "asldkfjasdlkfj", | |
"database_db": "laskdjflaskdjf", | |
"rabbitmq_host": "127.0.0.1", | |
"rabbitmq_port": 5672, | |
"rabbitmq_vhost": "/", | |
"rabbitmq_username": "guest", | |
"rabbitmq_password": "guest", | |
"rabbitmq_queue": "queue", | |
"rabbitmq_exchange": "exchange", | |
"rabbitmq_routing_key": "routing_key", | |
"rabbitmq_rate_limit": 3 | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from aconnection import ASESConnection | |
from pprint import pprint | |
from twisted.enterprise import adbapi | |
from twisted.internet import reactor, defer | |
from twisted.internet.defer import Deferred | |
from twisted.internet.ssl import ClientContextFactory | |
from twisted.web.client import Agent, HTTPConnectionPool | |
from twisted_queue import * | |
import functools | |
import json | |
import MySQLdb, MySQLdb.cursors | |
import re | |
import sys | |
import time | |
import traceback | |
import twisted.internet.task | |
# This class is used to enable SSL... Seems to just extend getContext to | |
# accept extra arguments that HTTP might require. | |
class WebClientContextFactory(ClientContextFactory): | |
def getContext(self, hostname, port): | |
return ClientContextFactory.getContext(self) | |
class EmailSender: | |
def __init__(self, reactor, email_id): | |
contextFactory = WebClientContextFactory() | |
self.sent_count = 0 | |
self.last_sent_count = 0 | |
self.failed_count = 0 | |
self.last_failed_count = 0 | |
self.email_id = email_id | |
self.reactor = reactor | |
pool = HTTPConnectionPool(reactor, persistent=True) | |
pool.maxPersistentPerHost = 50 | |
pool.cachedConnectionTimeout = 10 | |
https_agent = Agent(reactor, contextFactory, pool=pool) | |
self.set_up_configuration() | |
self.db_pool = adbapi.ConnectionPool( | |
"MySQLdb", | |
host=self.configuration['database_host'], | |
user=self.configuration['database_user'], | |
passwd=self.configuration['database_passwd'], | |
db=self.configuration['database_db'], | |
cursorclass=MySQLdb.cursors.DictCursor, | |
) | |
# This probably should just get activated with a --live hook | |
self.ses_connection = ASESConnection( | |
callback=self.sending_email_succeeded, | |
errback=self.boto_errback, | |
reactor=reactor, | |
agent=https_agent | |
) | |
if 'production' in self.configuration and self.configuration['production'] == "I really really know what I'm doing here": | |
print "WARNING: RUNNING PRODUCTION SEND. HIDE YO KIDS, HIDE YO WIFE." | |
time.sleep(10) | |
else: | |
self.ses_connection.host_override = 'ses.v' | |
self.reactor.callWhenRunning(self.go) | |
def set_up_configuration(self): | |
try: | |
configuration = json.loads(open('config.json').read()) | |
except: | |
print "Failed to load manual configuration; using default" | |
configuration = {} | |
default_configuration = { | |
"database_host": "127.0.0.1", | |
"database_user": "asldkfjasldkfj", | |
"database_passwd": "asldkfjasldkfj", | |
"database_db": "asldkfjasldkfjasdlfkj", | |
"rabbitmq_host": "127.0.0.1", | |
"rabbitmq_port": 5672, | |
"rabbitmq_vhost": "/", | |
"rabbitmq_username": "guest", | |
"rabbitmq_password": "guest", | |
"rabbitmq_queue": "huge_email_address_queue", | |
"rabbitmq_exchange": "huge_email_address_queue", | |
"rabbitmq_routing_key": "huge_email_address_queue", | |
"rabbitmq_rate_limit": 1, | |
"source_address": '"MOJO Themes" <noreply@mojo-themes.com>', | |
"reporting_interval": 1 | |
} | |
self.configuration = default_configuration | |
for (k, v) in configuration.items(): | |
self.configuration[k] = v | |
pprint({'Configuration': self.configuration}) | |
def go(self): | |
self.load_email_content() | |
self.send_emails() | |
@defer.inlineCallbacks | |
def load_email_content(self): | |
templates = yield self.db_pool.runQuery( | |
"SELECT subject, body FROM email_content WHERE id=(%s)", | |
(self.email_id,) | |
) | |
self.email_templates = [] | |
if not templates: | |
self.reactor.stop() | |
print "No emails loaded" | |
sys.exit(1) | |
try: | |
for template in templates: | |
self.load_template(template) | |
except Exception, e: | |
print e | |
self.reactor.stop() | |
sys.exit(1) | |
if not self.email_templates: | |
print "No templates loaded!" | |
self.reactor.stop() | |
sys.exit(1) | |
def load_template(self, template): | |
self.email_templates = [{ | |
'subject': template['subject'], | |
'body': template['body'] | |
}] | |
def get_template(self, **argv): | |
return self.email_templates[0] | |
def apply_template(self, template, receiver_info): | |
subject = template['subject'] | |
body = template['body'] | |
if 'host_account_id' in receiver_info: | |
host_account_id = receiver_info['host_account_id'] | |
else: | |
host_account_id = "null" | |
user_id = receiver_info['user_id'] | |
try: | |
# import pdb; pdb.set_trace() | |
body = body.replace('{{host_account_id}}', str(host_account_id)) | |
body = body.replace('{{user_id}}', str(user_id)) | |
except Exception, e: | |
print "failed", sys.exc_info()[2] | |
traceback.print_tb(sys.exc_info()[2]) | |
raise e | |
return { | |
'subject': subject, | |
'content': body, | |
'source': self.configuration['source_address'] | |
} | |
def send_emails(self): | |
stats_loop = twisted.internet.task.LoopingCall(self.print_stats) | |
stats_loop.start(self.configuration['reporting_interval']) | |
self.message_queue = twisted_queue_receiver( | |
callback=self.send_email, | |
errback=self.queue_errback, | |
host=self.configuration['rabbitmq_host'], | |
port=self.configuration['rabbitmq_port'], | |
vhost=self.configuration['rabbitmq_vhost'], | |
username=self.configuration['rabbitmq_username'], | |
password=self.configuration['rabbitmq_password'], | |
queue=self.configuration['rabbitmq_queue'], | |
exchange=self.configuration['rabbitmq_exchange'], | |
routing_key=self.configuration['rabbitmq_routing_key'], | |
rate_limit=self.configuration['rabbitmq_rate_limit'], | |
no_ack=False, | |
durable=True | |
) | |
def queue_errback(self, err): | |
print err | |
print "I should probably be killed because I am a bad kitty." | |
reactor.stop() | |
sys.exit(1) | |
@defer.inlineCallbacks | |
def send_email(self, channel, raw_message): | |
if not (channel or raw_message): | |
defer.returnValue(None) | |
try: | |
receiver_info = json.loads(raw_message.content.body) | |
except ValueError, e: | |
channel.basic_ack(delivery_tag=raw_message.delivery_tag) | |
print "failed parsing", raw_message.content.body | |
self.sending_email_failed(e, raw_message.content.body) | |
defer.returnValue(None) | |
try: | |
template = self.get_template(receiver_info=receiver_info) | |
except Exception, e: | |
channel.basic_ack(delivery_tag=raw_message.delivery_tag) | |
print "Missing email template:", receiver_info, e | |
self.sending_email_failed(e, raw_message.content.body) | |
defer.returnValue(None) | |
try: | |
templated_email = self.apply_template(template, receiver_info) | |
except ValueError, e: | |
channel.basic_ack(delivery_tag=raw_message.delivery_tag) | |
print "Couldn't template email:", receiver_info | |
self.sending_email_failed(e, raw_message.content.body) | |
defer.returnValue(None) | |
email_subject = templated_email['subject'] | |
email_content = templated_email['content'] | |
email_source = templated_email['source'] | |
try: | |
self.ses_connection.send_email( | |
source=email_source, subject=email_subject, body=email_content, | |
format='html', to_addresses=[receiver_info['email']]) | |
except Exception, e: | |
channel.basic_ack(delivery_tag=raw_message.delivery_tag) | |
print "Send_email exception:", e | |
print receiver_info['email'] | |
self.sending_email_failed(e, raw_message.content.body) | |
defer.returnValue(None) | |
try: | |
yield channel.basic_ack(delivery_tag=raw_message.delivery_tag) | |
except Exception, e: | |
print "Couldn't ack message because ???", e | |
defer.returnValue(None) | |
def print_stats(self): | |
diff_sent_count = self.sent_count - self.last_sent_count | |
diff_failed_count = self.failed_count - self.last_failed_count | |
self.last_sent_count = self.sent_count | |
self.last_failed_count = self.failed_count | |
print "sent_count:", self.sent_count, "period rate:", str(diff_sent_count / self.configuration['reporting_interval']) + "/s" | |
print "failed_count:", self.failed_count, "period rate:", str(diff_failed_count / self.configuration['reporting_interval']) + "/s" | |
sys.stdout.flush() | |
def sending_email_succeeded(self, e, data): | |
# sys.stdout.write('.') | |
# Could stuff into a database here if desired | |
self.sent_count += 1 | |
def boto_errback(self, data, error): | |
print "failed sending:", error, data | |
self.failed_count += 1 | |
def sending_email_failed(self, e, data): | |
# Could stuff into a database or something else if desired | |
print "failed sending to", data, "because", e | |
self.failed_count += 1 | |
if __name__ == '__main__': | |
sender = EmailSender(reactor, sys.argv[1]) | |
reactor.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This adds the cloudfront servers' ip addresses into Memcache. | |
# Memcache Key: cloudfront_server_addresses | |
# Format: | |
# { | |
# 'subnet': ['server', 'server', 'server', 'server', ...], | |
# ... | |
# } | |
import sys | |
from pprint import pprint | |
import time | |
from twisted.internet.defer import inlineCallbacks | |
from twisted.internet import reactor | |
from twisted.internet.protocol import ClientCreator | |
from twisted.python import log | |
from txamqp.protocol import AMQClient | |
from txamqp.client import TwistedDelegate | |
import txamqp.spec | |
class twisted_queue_receiver: | |
def __init__(self, callback, errback, host, port, vhost, username, password, queue, exchange, routing_key, durable=False, rate_limit=5, prefetch_count=500, no_ack=False): | |
import sys | |
spec = "amqp0-8.stripped.rabbitmq.xml" | |
self.queue = queue | |
self.exchange = exchange | |
self.routing_key = routing_key | |
self.callback = callback | |
self.durable = durable | |
self.rate_limit = rate_limit | |
self.min_interval = 1.0 / float(self.rate_limit) | |
self.last_time_called = time.time() | |
self.prefetch_count = prefetch_count | |
self.no_ack = no_ack | |
spec = txamqp.spec.load(spec) | |
delegate = TwistedDelegate() | |
d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost, spec=spec).connectTCP(host, port) | |
d.addCallback(self.gotConnection, username, password) | |
d.addErrback(errback) | |
@inlineCallbacks | |
def gotConnection(self, conn, username, password): | |
print "Connected to broker." | |
yield conn.authenticate(username, password) | |
print "Authenticated. Ready to receive messages" | |
self.channel = yield conn.channel(1) | |
yield self.channel.channel_open() | |
yield self.channel.basic_qos(prefetch_count=self.prefetch_count) | |
yield self.channel.queue_declare(queue=self.queue, durable=self.durable, exclusive=False, auto_delete=False) | |
yield self.channel.exchange_declare(exchange=self.exchange, type="direct", durable=self.durable, auto_delete=False) | |
yield self.channel.queue_bind(queue=self.queue, exchange=self.exchange, routing_key=self.routing_key) | |
yield self.channel.basic_consume(queue=self.queue, no_ack=self.no_ack, consumer_tag=self.routing_key) | |
self.queue = yield conn.queue(self.routing_key) | |
self.getMessages() | |
@inlineCallbacks | |
def getMessages(self): | |
""" | |
def RateLimited(maxPerSecond): | |
minInterval = 1.0 / float(maxPerSecond) | |
def decorate(func): | |
lastTimeCalled = [0.0] | |
def rateLimitedFunction(*args,**kargs): | |
elapsed = time.time() - lastTimeCalled[0] | |
leftToWait = minInterval - elapsed | |
if leftToWait>0: | |
time.sleep(leftToWait) | |
ret = func(*args,**kargs) | |
lastTimeCalled[0] = time.time() | |
return ret | |
return rateLimitedFunction | |
return decorate | |
""" | |
elapsed = time.time() - self.last_time_called | |
left_to_wait = self.min_interval - elapsed | |
if left_to_wait > 0: | |
yield reactor.callLater(left_to_wait, self.getMessages) | |
else: | |
self.last_time_called = time.time() | |
message = yield self.queue.get() | |
self.callback(self.channel, message) | |
elapsed = time.time() - self.last_time_called | |
left_to_wait = self.min_interval - elapsed | |
if left_to_wait < 0: | |
left_to_wait = 0 | |
#print "left_to_wait: ", left_to_wait | |
yield reactor.callLater(left_to_wait*1.01, self.getMessages) | |
class twisted_queue_sender: | |
@inlineCallbacks | |
def gotConnection(self, conn, username, password): | |
print "Connected to broker." | |
yield conn.authenticate(username, password) | |
print "Authenticated. Ready to send messages" | |
self.channel = yield conn.channel(1) | |
yield self.channel.channel_open() | |
yield self.channel.queue_declare(queue=self.queue, durable=self.durable, exclusive=False, auto_delete=False) | |
yield self.channel.exchange_declare(exchange=self.exchange, type="direct", durable=self.durable, auto_delete=False) | |
yield self.channel.queue_bind(queue=self.queue, exchange=self.exchange, routing_key=self.routing_key) | |
yield self.callback(self.channel) | |
yield self.channel.channel_close() | |
#channel0 = yield conn.channel(0) | |
#yield channel0.channel_close() | |
#reactor.stop() | |
@inlineCallbacks | |
def put(self, msg): | |
yield self.queue.put(msg) | |
def __init__(self, callback, host, port, vhost, username, password, queue, exchange, routing_key, durable=False): | |
import sys | |
spec = "amqp0-8.stripped.rabbitmq.xml" | |
self.exchange = exchange | |
self.routing_key = routing_key | |
self.durable = durable | |
self.callback = callback | |
self.queue = queue | |
spec = txamqp.spec.load(spec) | |
delegate = TwistedDelegate() | |
d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost=vhost, spec=spec).connectTCP(host, port) | |
d.addCallback(self.gotConnection, username, password) | |
def whoops(err): | |
if reactor.running: | |
log.err(err) | |
reactor.stop() | |
d.addErrback(whoops) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment