Skip to content

Instantly share code, notes, and snippets.

@denen99
Created June 1, 2012 13:17
Show Gist options
  • Save denen99/2852086 to your computer and use it in GitHub Desktop.
Save denen99/2852086 to your computer and use it in GitHub Desktop.
unicorn amqp
after_fork do |server, worker|
require "amqp"
amqp_opts = {:port => $RABBITMQ_PORT, :host => $RABBITMQ_HOST, :username => $RABBITMQ_USER, :password => $RABBITMQ_PASS, :vhost => $RABBITMQ_VHOST}
t = Thread.new { AMQP.start(amqp_opts) }
sleep(1.0)
EventMachine.add_periodic_timer(60) do
exchange = AMQP.channel.topic($TOPIC_EXCHANGE)
exchange.publish("Heartbeat message #{Time.now.to_s}", :routing_key => "websocket.heartbeat.0")
end
EventMachine.next_tick do
AMQP.channel ||= AMQP::Channel.new(AMQP.connection, 2, :auto_recovery => true)
AMQP.channel.on_error do |ch, channel_close|
raise channel_close.reply_text
end
if AMQP.channel.auto_recovering?
puts "Channel #{ch1.id} IS auto-recovering"
end
AMQP.connection.on_tcp_connection_loss do |conn, settings|
puts "[AMQP network failure] Trying to reconnect..."
conn.reconnect(false, 2)
end
exchange = AMQP.channel.topic($TOPIC_EXCHANGE)
3.times do |i|
puts "[after_fork/amqp] Publishing a warmup message ##{i}"
exchange.publish("A warmup message #{i} from #{Time.now.strftime('%H:%M:%S %m/%b/%Y')}", :routing_key => "websocket.heartbeat.0")
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment