Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Patch to py-amqplib for experimental support for flow-control blocking of publications
diff -r aa7a2b480166 amqplib/client_0_8/channel.py
--- a/amqplib/client_0_8/channel.py Thu Dec 10 23:40:05 2009 -0600
+++ b/amqplib/client_0_8/channel.py Thu Jan 07 17:43:42 2010 +0000
@@ -74,6 +74,7 @@
self.default_ticket = 0
self.is_open = False
self.active = True # Flow control
+ self.flow_callback = None
self.alerts = Queue()
self.returned_messages = Queue()
self.callbacks = {}
@@ -394,7 +395,8 @@
"""
self.active = args.read_bit()
-
+ if self.flow_callback:
+ self.flow_callback(self, self.active)
self._x_flow_ok(self.active)
@@ -2129,7 +2131,7 @@
def basic_publish(self, msg, exchange='', routing_key='',
- mandatory=False, immediate=False, ticket=None):
+ mandatory=False, immediate=False, ticket=None, block_on_flow_control=False):
"""
publish a message
@@ -2210,6 +2212,12 @@
for the exchange.
"""
+ if not self.active:
+ if block_on_flow_control:
+ while not self.active:
+ self.wait()
+ else:
+ raise AMQPContentTransmissionForbidden()
args = AMQPWriter()
if ticket is not None:
args.write_short(ticket)
diff -r aa7a2b480166 amqplib/client_0_8/connection.py
--- a/amqplib/client_0_8/connection.py Thu Dec 10 23:40:05 2009 -0600
+++ b/amqplib/client_0_8/connection.py Thu Jan 07 17:43:42 2010 +0000
@@ -186,7 +186,8 @@
method_sig = queued_method[0]
if (allowed_methods is None) \
or (method_sig in allowed_methods) \
- or (method_sig == (20, 40)):
+ or (method_sig == (20, 40)) \
+ or (method_sig == (20, 20)):
method_queue.remove(queued_method)
return queued_method
@@ -200,7 +201,8 @@
if (channel == channel_id) \
and ((allowed_methods is None) \
or (method_sig in allowed_methods) \
- or (method_sig == (20, 40))):
+ or (method_sig == (20, 40)) \
+ or (method_sig == (20, 20))):
return method_sig, args, content
#
diff -r aa7a2b480166 amqplib/client_0_8/exceptions.py
--- a/amqplib/client_0_8/exceptions.py Thu Dec 10 23:40:05 2009 -0600
+++ b/amqplib/client_0_8/exceptions.py Thu Jan 07 17:43:42 2010 +0000
@@ -23,6 +23,7 @@
'AMQPException',
'AMQPConnectionException',
'AMQPChannelException',
+ 'AMQPContentTransmissionForbidden'
]
@@ -48,6 +49,10 @@
pass
+class AMQPContentTransmissionForbidden(Exception):
+ pass
+
+
METHOD_NAME_MAP = {
(10, 10): 'Connection.start',
(10, 11): 'Connection.start_ok',
diff -r aa7a2b480166 demo/demo_send.py
--- a/demo/demo_send.py Thu Dec 10 23:40:05 2009 -0600
+++ b/demo/demo_send.py Thu Jan 07 17:43:42 2010 +0000
@@ -41,13 +41,18 @@
conn = amqp.Connection(options.host, userid=options.userid, password=options.password, ssl=options.ssl)
ch = conn.channel()
+
+ def my_flow_callback(ch, active):
+ print 'We are now %spermitted to send content to the broker' % ('' if active else 'NOT ',)
+ ch.flow_callback = my_flow_callback
+
ch.access_request('/data', active=True, write=True)
ch.exchange_declare('myfan', 'fanout', auto_delete=True)
msg = amqp.Message(msg_body, content_type='text/plain', application_headers={'foo': 7, 'bar': 'baz'})
- ch.basic_publish(msg, 'myfan')
+ ch.basic_publish(msg, 'myfan', block_on_flow_control = True)
ch.close()
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment