Created
October 4, 2011 07:11
-
-
Save totty/1261062 to your computer and use it in GitHub Desktop.
pub sub recovery & retry
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
require "rubygems" | |
require "amqp" # requires version >= 0.8.0.RC14 | |
puts | |
EM.run do | |
puts "#"*20 | |
puts "AMQP.start block runs" | |
def connection(&block) | |
options = { | |
:user => 'guest', | |
:pass => 'guest', | |
:vhost => '/', | |
:logging => false, | |
:insist => false, | |
# :timeout => 10.3, | |
:host => 'localhost', | |
:port => 5672, | |
} | |
@session ||= AMQP.connect(options, &block) | |
if @session && !@session.connected? | |
@session.reconnect_to(options, 2) | |
end | |
connection_setup(@session) | |
end | |
def connection_setup(session) | |
# on_open, on_closedに渡されたブロックは、何度再接続をしても最初の一度だけしか呼び出されないが、 | |
# after_recovery(on_recovery)に渡されたブロックは、再接続の度に呼び出されます。 | |
# connection.on_open do |*args| | |
# puts "on_open: " << args.inspect | |
# end | |
# connection.on_closed do |*args| | |
# puts "on_closed: " << args.inspect | |
# end | |
session.after_recovery do |conn, settings| | |
puts "after_recovery: " << settings.inspect | |
channel(true) | |
exchange(true) | |
puts "#channel.open? = #{channel.open?}" | |
puts "#connection.connected? = #{connection.connected?}" | |
end | |
session.on_error do |conn, connection_close| | |
puts "connection.on_error channel_close.reply_text. channel_close: " << connection_close.inspect | |
puts "[connection.close] Reply code = #{connection_close.reply_code}, reply text = #{connection_close.reply_text}" | |
# if connection_close.reply_code == 320 | |
# puts "[connection.close] Setting up a periodic reconnection timer..." | |
# # every 30 seconds | |
# conn.periodically_reconnect(30) | |
# end | |
# puts "raising channel_close.reply_text. channel_close: " << connection_close.inspect | |
# raise connection_close.reply_text | |
end | |
session.on_tcp_connection_loss do |conn, settings| | |
puts "[network failure] Trying to reconnect..." | |
conn.reconnect(false, 1) | |
end | |
session.on_tcp_connection_failure do |conn, settings| | |
puts("[warn] connection.on_tcp_connection_failure: now reconnecting.") | |
end | |
session | |
end | |
def channel(force = false) | |
options = { | |
:prefetch => 1, | |
:auto_recovery => true, | |
} | |
@channel ||= AMQP::Channel.new(connection, options) | |
@channel = AMQP::Channel.new(connection, options) if force || !@channel.open? | |
@channel.on_error do |ch, channel_close| | |
puts "raising channel_close.reply_text. channel_close: " << channel_close.inspect | |
end | |
@channel | |
end | |
def exchange(force = false) | |
options = { | |
:passive => false, | |
:durable => true, | |
:auto_delete => false, | |
:internal => false, | |
:nowait => true, | |
} | |
@exchange ||= AMQP::Exchange.new(channel, "direct", "exchange_totty", options) | |
@exchange = AMQP::Exchange.new(channel, "direct", "exchange_totty", options) if force | |
@exchange | |
end | |
def queue(force = false) | |
options = { | |
:durable => true | |
} | |
@queue ||= AMQP::Queue.new(channel, "queue_totty", options) | |
@queue = AMQP::Queue.new(channel, "queue_totty", options) if force | |
@queue.bind(exchange) | |
@queue | |
end | |
@count = 0 | |
def fire | |
EM.next_tick do | |
@success = false | |
puts | |
puts "== fire ==" | |
puts " [[start!!]]" | |
puts " [[connection.connected? = #{connection.connected?}], [channel.open? = #{channel.open?}]]" | |
# connection.close if @count == 0 | |
# @count = 1 | |
begin | |
puts | |
sleep 5 | |
puts "** publish" | |
options = { | |
:mandatory => true, | |
:persistent => true, | |
} | |
exchange.publish("hoge", options) do | |
puts "@ publish "*5 | |
@success = true | |
connection.disconnect{ puts "Disconnected. Exiting"; EM.stop } | |
end | |
EM.add_timer(2) { | |
if @success | |
connection.close { EM.stop } | |
else | |
fire | |
end | |
} | |
rescue => e | |
puts "@ raise "*5 | |
puts "raise => #{e.inspect}" | |
retry | |
end | |
puts "fin.." | |
end | |
end | |
def process | |
puts "== process ==" | |
puts " [[start!!]]" | |
queue.subscribe(:ack => true, :nowait => true) do |metadata, payload| | |
puts "@ subscribe "*5 | |
sleep 5 # このタイミングで stop_app , reconnect が行われる、start_app したら、ちゃんと受け取れた | |
puts "** subscribe" | |
metadata.ack | |
end | |
puts "fin.." | |
end | |
############# | |
# publish | |
############# | |
fire | |
puts "fire end" | |
############# | |
# subscribe | |
############# | |
# process | |
# puts "process end" | |
show_stopper = Proc.new { | |
puts "# show_stopper" | |
connection.disconnect{ puts "Disconnected. Exiting"; EventMachine.stop } | |
puts "#"*20 | |
} | |
Signal.trap "TERM", show_stopper | |
Signal.trap "INT", show_stopper | |
# EM.add_timer(20, show_stopper) | |
puts "EOF" | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment