Skip to content

Instantly share code, notes, and snippets.

@soupmatt
Created November 21, 2013 00:06
Show Gist options
  • Save soupmatt/7573582 to your computer and use it in GitHub Desktop.
Save soupmatt/7573582 to your computer and use it in GitHub Desktop.
class BaseDaemon
attr_reader :logger
def initialize
@stopping = false
@logger = ::Logger.new(STDOUT)
@ending_thread = Thread.new do
Thread.stop
logger.info "Received a signal in #{name}"
stop
Thread.main.wakeup
end
ending_proc = proc do
@ending_thread.wakeup
end
trap "TERM", &ending_proc
trap "INT", &ending_proc
trap "USR2" do
Thread.new do
threads = Thread.list.reject{ |t| t == Thread.current }
puts "here are the other #{threads.size} threads"
threads.each do |t|
name = case t
when Thread.main
"Main #{t.inspect}"
when @ending_thread
"Ending #{t.inspect}"
else
t.inspect
end
puts ([name] + t.backtrace).join("\n ")
end
end
end
end
def join
@ending_thread.join
end
def name
self.class.name
end
def stop
@stopping = true
end
def stopping?
@stopping
end
def run
raise "must be implemented in subclass"
end
def with_error_rescue
begin
yield
rescue => e
report_error(e, "Error in run loop for #{name}")
end
end
def report_error(e, message)
log_error(e, message)
end
def log_error(e, message)
logger.error exception_to_pretty_string(e, message)
end
def exception_to_pretty_string(error, message=nil)
lines = []
msg = ""
if message
msg = "#{message}\n"
end
lines << "#{msg}#{error.class.name}: #{error.message}"
lines += error.backtrace
lines.join(" \n")
end
end
require_relative 'base_daemon'
class MessageConsumerDaemon < BaseDaemon
attr_reader :config, :session
def initialize(config, use_close_with_cleanup=true)
super()
@config = config
@use_close_with_cleanup = use_close_with_cleanup
create_bunny_connection
end
def run
retry_delay = 1
begin
with_error_rescue do
begin
bunny_restart if retry_delay > 1
logger.info "bunny connected!"
create_subscriptions
retry_delay = 1
join
rescue => e
report_error(e, "Error for #{name}. Attempting to reconnect to RabbitMQ in #{retry_delay} seconds.")
with_error_rescue do
bunny_restart
end
sleep retry_delay
if retry_delay >= 30
retry_delay = 60
else
retry_delay *= 2
end
logger.info "attempting to reconnect..."
end
end
end until stopping?
end
def create_bunny_connection
@session = Bunny.new(config)
@session.start
end
def bunny_restart
if @use_close_with_cleanup
session.close_with_cleanup
else
session.close
end
create_bunny_connection
end
def stop
super
if @use_close_with_cleanup
session.close_with_cleanup
else
session.close
end
end
def create_subscriptions
3.times do |i|
ch = session.create_channel
ch.queue("queue1").subscribe do |_, _, payload|
puts "queue1 consumer #{i} got a message with payload #{payload}"
end
end
2.times do |i|
ch = session.create_channel
ch.queue("queue2").subscribe do |_, _, payload|
puts "queue2 consumer #{i} got a message with payload #{payload}"
end
end
end
end
require 'bunny'
require_relative 'session_patch'
require_relative 'message_consumer_daemon'
bunny_config_hash = {
vhost: "bunny_testbed",
automatically_recover: false
}
mcd = MessageConsumerDaemon.new(bunny_config_hash)
mcd.run
require 'bunny'
module Bunny
class Session
def close_with_cleanup
log_errors { self.close }
@channels.each do |_, channel|
log_errors { channel.maybe_kill_consumer_work_pool! }
end
log_errors { self.maybe_shutdown_heartbeat_sender }
log_errors { self.maybe_shutdown_reader_loop }
log_errors { self.close_transport }
end
def log_errors
begin
yield
rescue => e
@logger.info "#{e.class}: #{e.to_s}"
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment