Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jakedouglas/265662 to your computer and use it in GitHub Desktop.
Save jakedouglas/265662 to your computer and use it in GitHub Desktop.
From 55054011d2db9eb99affb96f3ffb89cbdab49dc9 Mon Sep 17 00:00:00 2001
From: Jake Douglas <jakecdouglas@gmail.com>
Date: Tue, 29 Dec 2009 14:22:28 -0800
Subject: [PATCH] add :confirm => some_proc for Queue#subscribe to receive ConsumeOk
---
lib/mq.rb | 7 +++++++
lib/mq/queue.rb | 12 ++++++++++++
2 files changed, 19 insertions(+), 0 deletions(-)
diff --git a/lib/mq.rb b/lib/mq.rb
index bfce736..789e3c8 100644
--- a/lib/mq.rb
+++ b/lib/mq.rb
@@ -230,6 +230,13 @@ class MQ
c.channels.delete @channel
c.close if c.channels.empty?
}
+
+ when Protocol::Basic::ConsumeOk
+ if @consumer = consumers[ method.consumer_tag ]
+ @consumer.confirm_subscribe
+ else
+ MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}"
+ end
end
end
end
diff --git a/lib/mq/queue.rb b/lib/mq/queue.rb
index 837c1f7..1bdb63c 100644
--- a/lib/mq/queue.rb
+++ b/lib/mq/queue.rb
@@ -306,6 +306,12 @@ class MQ
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
+ # * :confirm => proc (default nil)
+ # If set, this proc will be called when the server confirms subscription
+ # to the queue with a ConsumeOk message. Setting this option will
+ # automatically set :nowait => false. This is required for the server
+ # to send a confirmation.
+ #
def subscribe opts = {}, &blk
@consumer_tag = "#{name}-#{Kernel.rand(999_999_999_999)}"
@mq.consumers[@consumer_tag] = self
@@ -314,6 +320,7 @@ class MQ
@on_msg = blk
@on_msg_opts = opts
+ opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm])
@mq.callback{
@mq.send Protocol::Basic::Consume.new({ :queue => name,
@@ -408,6 +415,11 @@ class MQ
end
end
+ def confirm_subscribe
+ @on_confirm_subscribe.call if @on_confirm_subscribe
+ @on_confirm_subscribe = nil
+ end
+
def cancelled
@on_cancel.call if @on_cancel
@on_cancel = @on_msg = nil
--
1.6.4.2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment