Created
May 12, 2010 23:41
-
-
Save tonyg/399282 to your computer and use it in GitHub Desktop.
Patch to py-amqplib for experimental support for flow-control blocking of publications
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff -r aa7a2b480166 amqplib/client_0_8/channel.py | |
--- a/amqplib/client_0_8/channel.py Thu Dec 10 23:40:05 2009 -0600 | |
+++ b/amqplib/client_0_8/channel.py Thu Jan 07 17:43:42 2010 +0000 | |
@@ -74,6 +74,7 @@ | |
self.default_ticket = 0 | |
self.is_open = False | |
self.active = True # Flow control | |
+ self.flow_callback = None | |
self.alerts = Queue() | |
self.returned_messages = Queue() | |
self.callbacks = {} | |
@@ -394,7 +395,8 @@ | |
""" | |
self.active = args.read_bit() | |
- | |
+ if self.flow_callback: | |
+ self.flow_callback(self, self.active) | |
self._x_flow_ok(self.active) | |
@@ -2129,7 +2131,7 @@ | |
def basic_publish(self, msg, exchange='', routing_key='', | |
- mandatory=False, immediate=False, ticket=None): | |
+ mandatory=False, immediate=False, ticket=None, block_on_flow_control=False): | |
""" | |
publish a message | |
@@ -2210,6 +2212,12 @@ | |
for the exchange. | |
""" | |
+ if not self.active: | |
+ if block_on_flow_control: | |
+ while not self.active: | |
+ self.wait() | |
+ else: | |
+ raise AMQPContentTransmissionForbidden() | |
args = AMQPWriter() | |
if ticket is not None: | |
args.write_short(ticket) | |
diff -r aa7a2b480166 amqplib/client_0_8/connection.py | |
--- a/amqplib/client_0_8/connection.py Thu Dec 10 23:40:05 2009 -0600 | |
+++ b/amqplib/client_0_8/connection.py Thu Jan 07 17:43:42 2010 +0000 | |
@@ -186,7 +186,8 @@ | |
method_sig = queued_method[0] | |
if (allowed_methods is None) \ | |
or (method_sig in allowed_methods) \ | |
- or (method_sig == (20, 40)): | |
+ or (method_sig == (20, 40)) \ | |
+ or (method_sig == (20, 20)): | |
method_queue.remove(queued_method) | |
return queued_method | |
@@ -200,7 +201,8 @@ | |
if (channel == channel_id) \ | |
and ((allowed_methods is None) \ | |
or (method_sig in allowed_methods) \ | |
- or (method_sig == (20, 40))): | |
+ or (method_sig == (20, 40)) \ | |
+ or (method_sig == (20, 20))): | |
return method_sig, args, content | |
# | |
diff -r aa7a2b480166 amqplib/client_0_8/exceptions.py | |
--- a/amqplib/client_0_8/exceptions.py Thu Dec 10 23:40:05 2009 -0600 | |
+++ b/amqplib/client_0_8/exceptions.py Thu Jan 07 17:43:42 2010 +0000 | |
@@ -23,6 +23,7 @@ | |
'AMQPException', | |
'AMQPConnectionException', | |
'AMQPChannelException', | |
+ 'AMQPContentTransmissionForbidden' | |
] | |
@@ -48,6 +49,10 @@ | |
pass | |
+class AMQPContentTransmissionForbidden(Exception): | |
+ pass | |
+ | |
+ | |
METHOD_NAME_MAP = { | |
(10, 10): 'Connection.start', | |
(10, 11): 'Connection.start_ok', | |
diff -r aa7a2b480166 demo/demo_send.py | |
--- a/demo/demo_send.py Thu Dec 10 23:40:05 2009 -0600 | |
+++ b/demo/demo_send.py Thu Jan 07 17:43:42 2010 +0000 | |
@@ -41,13 +41,18 @@ | |
conn = amqp.Connection(options.host, userid=options.userid, password=options.password, ssl=options.ssl) | |
ch = conn.channel() | |
+ | |
+ def my_flow_callback(ch, active): | |
+ print 'We are now %spermitted to send content to the broker' % ('' if active else 'NOT ',) | |
+ ch.flow_callback = my_flow_callback | |
+ | |
ch.access_request('/data', active=True, write=True) | |
ch.exchange_declare('myfan', 'fanout', auto_delete=True) | |
msg = amqp.Message(msg_body, content_type='text/plain', application_headers={'foo': 7, 'bar': 'baz'}) | |
- ch.basic_publish(msg, 'myfan') | |
+ ch.basic_publish(msg, 'myfan', block_on_flow_control = True) | |
ch.close() | |
conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment