Skip to content

Instantly share code, notes, and snippets.

@mhorbul
Created June 26, 2009 20:12
Show Gist options
  • Save mhorbul/136705 to your computer and use it in GitHub Desktop.
Save mhorbul/136705 to your computer and use it in GitHub Desktop.
require 'mq'
class Foo
def run
setup_queue
setup_recovering(@recover_timeout)
setup_subscription
end
def initialize(recover_timeout = 2, prefect_size = 10)
@proc_id = $$
@recover_timeout = recover_timeout
@prefect_size = prefect_size
@name = @topic = @key = self.class.to_s.downcase
end
def setup_queue
@channel = MQ.new
@channel.prefetch(@prefect_size)
@exchange = @channel.topic(@topic)
end
def setup_recovering(timeout, requeue = false)
EM.add_periodic_timer(timeout) {
@channel.callback {
@channel.send AMQP::Protocol::Basic::Recover.new({ :requeue => requeue })
}
}
end
def setup_subscription
@channel.queue(@name).bind(@exchange).subscribe(:ack => true) do |headers, msg|
log "(#{msg}) message received."
if(rand(10) > 5)
process!(msg)
MQ.topic("b").publish("r#{msg}") if @name == "a"
headers.ack
end
end
end
def process!(msg)
log "(#{msg}) process started."
sleep(10)
log "(#{msg}) process finished."
end
def log *args
p [ Time.now, self.class.to_s, @proc_id, *args ]
end
end
class A < Foo; end
class B < Foo; end
__END__
# save file as 't.rb' and run the following commands
ruby -e "require 't'; EM.run { A.new(2,10).run }"
ruby -e "require 't'; EM.run { B.new(2,10).run }"
ruby -e "require 't'; EM.run { A.new(2,2).run }"
ruby -e "require 't'; EM.run { B.new(2,5).run }"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment