Skip to content

Instantly share code, notes, and snippets.

@totty
Created October 4, 2011 07:11
Show Gist options
  • Save totty/1261062 to your computer and use it in GitHub Desktop.
Save totty/1261062 to your computer and use it in GitHub Desktop.
pub sub recovery & retry
# -*- coding: utf-8 -*-
require "rubygems"
require "amqp" # requires version >= 0.8.0.RC14
puts
EM.run do
puts "#"*20
puts "AMQP.start block runs"
def connection(&block)
options = {
:user => 'guest',
:pass => 'guest',
:vhost => '/',
:logging => false,
:insist => false,
# :timeout => 10.3,
:host => 'localhost',
:port => 5672,
}
@session ||= AMQP.connect(options, &block)
if @session && !@session.connected?
@session.reconnect_to(options, 2)
end
connection_setup(@session)
end
def connection_setup(session)
# on_open, on_closedに渡されたブロックは、何度再接続をしても最初の一度だけしか呼び出されないが、
# after_recovery(on_recovery)に渡されたブロックは、再接続の度に呼び出されます。
# connection.on_open do |*args|
# puts "on_open: " << args.inspect
# end
# connection.on_closed do |*args|
# puts "on_closed: " << args.inspect
# end
session.after_recovery do |conn, settings|
puts "after_recovery: " << settings.inspect
channel(true)
exchange(true)
puts "#channel.open? = #{channel.open?}"
puts "#connection.connected? = #{connection.connected?}"
end
session.on_error do |conn, connection_close|
puts "connection.on_error channel_close.reply_text. channel_close: " << connection_close.inspect
puts "[connection.close] Reply code = #{connection_close.reply_code}, reply text = #{connection_close.reply_text}"
# if connection_close.reply_code == 320
# puts "[connection.close] Setting up a periodic reconnection timer..."
# # every 30 seconds
# conn.periodically_reconnect(30)
# end
# puts "raising channel_close.reply_text. channel_close: " << connection_close.inspect
# raise connection_close.reply_text
end
session.on_tcp_connection_loss do |conn, settings|
puts "[network failure] Trying to reconnect..."
conn.reconnect(false, 1)
end
session.on_tcp_connection_failure do |conn, settings|
puts("[warn] connection.on_tcp_connection_failure: now reconnecting.")
end
session
end
def channel(force = false)
options = {
:prefetch => 1,
:auto_recovery => true,
}
@channel ||= AMQP::Channel.new(connection, options)
@channel = AMQP::Channel.new(connection, options) if force || !@channel.open?
@channel.on_error do |ch, channel_close|
puts "raising channel_close.reply_text. channel_close: " << channel_close.inspect
end
@channel
end
def exchange(force = false)
options = {
:passive => false,
:durable => true,
:auto_delete => false,
:internal => false,
:nowait => true,
}
@exchange ||= AMQP::Exchange.new(channel, "direct", "exchange_totty", options)
@exchange = AMQP::Exchange.new(channel, "direct", "exchange_totty", options) if force
@exchange
end
def queue(force = false)
options = {
:durable => true
}
@queue ||= AMQP::Queue.new(channel, "queue_totty", options)
@queue = AMQP::Queue.new(channel, "queue_totty", options) if force
@queue.bind(exchange)
@queue
end
@count = 0
def fire
EM.next_tick do
@success = false
puts
puts "== fire =="
puts " [[start!!]]"
puts " [[connection.connected? = #{connection.connected?}], [channel.open? = #{channel.open?}]]"
# connection.close if @count == 0
# @count = 1
begin
puts
sleep 5
puts "** publish"
options = {
:mandatory => true,
:persistent => true,
}
exchange.publish("hoge", options) do
puts "@ publish "*5
@success = true
connection.disconnect{ puts "Disconnected. Exiting"; EM.stop }
end
EM.add_timer(2) {
if @success
connection.close { EM.stop }
else
fire
end
}
rescue => e
puts "@ raise "*5
puts "raise => #{e.inspect}"
retry
end
puts "fin.."
end
end
def process
puts "== process =="
puts " [[start!!]]"
queue.subscribe(:ack => true, :nowait => true) do |metadata, payload|
puts "@ subscribe "*5
sleep 5 # このタイミングで stop_app , reconnect が行われる、start_app したら、ちゃんと受け取れた
puts "** subscribe"
metadata.ack
end
puts "fin.."
end
#############
# publish
#############
fire
puts "fire end"
#############
# subscribe
#############
# process
# puts "process end"
show_stopper = Proc.new {
puts "# show_stopper"
connection.disconnect{ puts "Disconnected. Exiting"; EventMachine.stop }
puts "#"*20
}
Signal.trap "TERM", show_stopper
Signal.trap "INT", show_stopper
# EM.add_timer(20, show_stopper)
puts "EOF"
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment