-
-
Save tomtaylor/76bbeb6688f27ef2e47e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
NoMethodError | |
undefined method `handle_method' for nil:NilClass | |
/usr/local/opt/rbenv/versions/1.9.3-p194/lib/ruby/gems/1.9.1/gems/bunny-0.9.0.pre6/lib/bunny/session.rb:283:in `handle_frame' | |
/usr/local/opt/rbenv/versions/1.9.3-p194/lib/ruby/gems/1.9.1/gems/bunny-0.9.0.pre6/lib/bunny/main_loop.rb:51:in `block in run_loop' | |
/usr/local/opt/rbenv/versions/1.9.3-p194/lib/ruby/gems/1.9.1/gems/bunny-0.9.0.pre6/lib/bunny/main_loop.rb:27:in `loop' | |
/usr/local/opt/rbenv/versions/1.9.3-p194/lib/ruby/gems/1.9.1/gems/bunny-0.9.0.pre6/lib/bunny/main_loop.rb:27:in `run_loop' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bunny' | |
require 'celluloid' | |
module Gertrude | |
class Subscriber | |
include Celluloid | |
def initialize(worker_klass) | |
@connection = Bunny.new(heartbeat_interval: 8) | |
@pool = worker_klass.pool(size: 4) | |
end | |
def start | |
@connection.start | |
@channel = @connection.channel | |
topic = @channel.topic("gertrude.testing", durable: false) | |
queue = @channel.queue("gertrude.testing", auto_delete: true) | |
queue.bind(topic) | |
# Get a handle on this Actor so when the subscribe block executes in its | |
# own thread we can pass to the payload to the Actor's thread for | |
# processing. | |
current_actor = Actor.current | |
queue.subscribe(:manual_ack => true) do |delivery_info, metadata, payload| | |
current_actor.process(delivery_info, metadata, payload) | |
end | |
end | |
def process(delivery_info, metadata, payload) | |
@pool.async.handle_message(Actor.current, delivery_info, metadata, payload) | |
end | |
def send_ack(delivery_tag) | |
@channel.ack(delivery_tag, false) | |
end | |
def stop | |
@connection.close | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Gertrude | |
class Worker | |
include Celluloid | |
def handle_message(subscriber, delivery_info, metadata, payload) | |
perform(delivery_info, metadata, payload) | |
subscriber.async.send_ack(delivery_info.delivery_tag) | |
end | |
def perform(delivery_info, metadata, payload) | |
puts "#{payload.inspect}" | |
raise "foo" if rand > 0.8 | |
sleep rand(5).to_i | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment