Skip to content

Instantly share code, notes, and snippets.

@alextaujenis
Last active August 29, 2015 14:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alextaujenis/82b757637b414e09186d to your computer and use it in GitHub Desktop.
Save alextaujenis/82b757637b414e09186d to your computer and use it in GitHub Desktop.
Ruby Redstorm (Apache Storm) spout with a non-blocking Bunny (RabbitMQ) connection
require "red_storm"
require "bunny"
require "thread"
# Storm spout with a non-blocking RabbitMQ connection
class YourSpout < RedStorm::DSL::Spout
output_fields :your_field_name
QUEUE_NAME = "your.queue.name"
on_init {@q, @bunny = Queue.new, detach_bunny}
on_send {@q.pop if @q.size > 0}
on_close {@bunny.kill} # :(
private
def detach_bunny
Thread.new do
Thread.current.abort_on_exception = true
q = Bunny.new.start.create_channel.queue(QUEUE_NAME)
q.subscribe(:block => true) {|a,b,c| @q.push(c) unless c.nil?}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment