Skip to content

Instantly share code, notes, and snippets.

@daviddavis
Last active August 31, 2017 13:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daviddavis/f5bb18b3a9f14c48a57257cb05df6c55 to your computer and use it in GitHub Desktop.
Save daviddavis/f5bb18b3a9f14c48a57257cb05df6c55 to your computer and use it in GitHub Desktop.
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index 017d0c1a..7a9cca95 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -1691,17 +1691,27 @@ class Transport(base.Transport):
start_time = monotonic()
elapsed_time = -1
while elapsed_time < timeout:
- try:
- receiver = self.session.next_receiver(timeout=timeout)
- message = receiver.fetch()
- queue = receiver.source
- except QpidEmpty:
+ receivers = self.session.receivers[self.last_receiver:] + self.session.receivers[0:self.last_receiver]
+ for receiver in receivers:
+ try:
+ message = receiver.fetch(timeout)
+ except QpidEmpty:
+ continue
+ else:
+ break
+ if message is None:
raise socket.timeout()
- else:
- connection._callbacks[queue](message)
+
+ self.last_receiver += 1
+ if self.last_receiver >= len(self.session.receivers):
+ self.last_receiver = 0
+
+ queue = receiver.source
+ connection._callbacks[queue](message)
elapsed_time = monotonic() - start_time
raise socket.timeout()
+
def create_channel(self, connection):
"""Create and return a :class:`~kombu.transport.qpid.Channel`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment