Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Ruby Redstorm (Apache Storm) spout with a non-blocking Bunny (RabbitMQ) connection
require "red_storm"
require "bunny"
require "thread"
# Storm spout with a non-blocking RabbitMQ connection
class YourSpout < RedStorm::DSL::Spout
output_fields :your_field_name
QUEUE_NAME = "your.queue.name"
on_init {@q, @bunny = Queue.new, detach_bunny}
on_send {@q.pop if @q.size > 0}
on_close {@bunny.kill} # :(
private
def detach_bunny
Thread.new do
Thread.current.abort_on_exception = true
q = Bunny.new.start.create_channel.queue(QUEUE_NAME)
q.subscribe(:block => true) {|a,b,c| @q.push(c) unless c.nil?}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment