Created
December 29, 2009 22:23
-
-
Save jakedouglas/265662 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 55054011d2db9eb99affb96f3ffb89cbdab49dc9 Mon Sep 17 00:00:00 2001 | |
From: Jake Douglas <jakecdouglas@gmail.com> | |
Date: Tue, 29 Dec 2009 14:22:28 -0800 | |
Subject: [PATCH] add :confirm => some_proc for Queue#subscribe to receive ConsumeOk | |
--- | |
lib/mq.rb | 7 +++++++ | |
lib/mq/queue.rb | 12 ++++++++++++ | |
2 files changed, 19 insertions(+), 0 deletions(-) | |
diff --git a/lib/mq.rb b/lib/mq.rb | |
index bfce736..789e3c8 100644 | |
--- a/lib/mq.rb | |
+++ b/lib/mq.rb | |
@@ -230,6 +230,13 @@ class MQ | |
c.channels.delete @channel | |
c.close if c.channels.empty? | |
} | |
+ | |
+ when Protocol::Basic::ConsumeOk | |
+ if @consumer = consumers[ method.consumer_tag ] | |
+ @consumer.confirm_subscribe | |
+ else | |
+ MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}" | |
+ end | |
end | |
end | |
end | |
diff --git a/lib/mq/queue.rb b/lib/mq/queue.rb | |
index 837c1f7..1bdb63c 100644 | |
--- a/lib/mq/queue.rb | |
+++ b/lib/mq/queue.rb | |
@@ -306,6 +306,12 @@ class MQ | |
# not wait for a reply method. If the server could not complete the | |
# method it will raise a channel or connection exception. | |
# | |
+ # * :confirm => proc (default nil) | |
+ # If set, this proc will be called when the server confirms subscription | |
+ # to the queue with a ConsumeOk message. Setting this option will | |
+ # automatically set :nowait => false. This is required for the server | |
+ # to send a confirmation. | |
+ # | |
def subscribe opts = {}, &blk | |
@consumer_tag = "#{name}-#{Kernel.rand(999_999_999_999)}" | |
@mq.consumers[@consumer_tag] = self | |
@@ -314,6 +320,7 @@ class MQ | |
@on_msg = blk | |
@on_msg_opts = opts | |
+ opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm]) | |
@mq.callback{ | |
@mq.send Protocol::Basic::Consume.new({ :queue => name, | |
@@ -408,6 +415,11 @@ class MQ | |
end | |
end | |
+ def confirm_subscribe | |
+ @on_confirm_subscribe.call if @on_confirm_subscribe | |
+ @on_confirm_subscribe = nil | |
+ end | |
+ | |
def cancelled | |
@on_cancel.call if @on_cancel | |
@on_cancel = @on_msg = nil | |
-- | |
1.6.4.2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment