Skip to content

Instantly share code, notes, and snippets.

@kmussel
Last active August 29, 2015 14:10
Show Gist options
  • Save kmussel/46438a9a381fde942a31 to your computer and use it in GitHub Desktop.
Save kmussel/46438a9a381fde942a31 to your computer and use it in GitHub Desktop.
rabbitmq publisher with celluloid
class Rabbitmq
include Celluloid
ACTOR_NAME = :rabbitmq_publisher
RETRY_COUNT = 2
attr_reader :retry_count
class << self
def publish(message, routing_key, headers = {}, &block)
actor.publish(message, routing_key, headers, block)
end
private
def actor
Celluloid::Actor[ACTOR_NAME]
end
end
def initialize
@retry_count = RETRY_COUNT
end
def publish(message, route, headers = {}, block)
params = {routing_key: route, persistent: true, content_type: 'application/json', app_id: 'play' }.merge(headers)
channel = connection.create_channel
channel.confirm_select
x = channel.topic("events", durable: true)
x.publish(message, params)
confirmed = channel.wait_for_confirms
fail PublisherError unless confirmed
confirmed
rescue => e
@retry_count -= 1 if @retry_count > 0
if @retry_count != 0
sleep(1)
retry
else
@retry_count = RETRY_COUNT
end
Rails.logger.info("PUBLISH EXCEPTION: #{e.inspect} --- message=#{message} ---params=#{params} ")
block.call if block
false
ensure
channel.close if channel
end
private
def connection
if !$rabbitmq_conn || $rabbitmq_conn.closed?
$rabbitmq_conn = MarchHare.connect(uri: Settings.rabbitmq_url, hosts:Settings.rabbitmq_hosts)
end
$rabbitmq_conn
end
end
class PublisherError < StandardError; end
Rabbitmq.supervise_as Rabbitmq::ACTOR_NAME
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment