Skip to content

Instantly share code, notes, and snippets.

@etaque
Forked from anonymous/manager.rb
Created December 13, 2012 14:51
Show Gist options
  • Save etaque/4276853 to your computer and use it in GitHub Desktop.
Save etaque/4276853 to your computer and use it in GitHub Desktop.
An attempt to use Stomp gem inside Celluloid actor.
module Actors
class Manager
include Celluloid
include Celluloid::Logger
def initialize
@brokers = CONF.brokers.voip
@brokers.each do |broker, conf|
StompListener.supervise_as("#{broker}_listener".to_sym,
current_actor,
conf.connection.merge(queue: conf.queues.main))
end
@cores = Celluloid.cores || 2
@processor_pool = Processor.pool_link(size: @cores, args: [current_actor])
end
def start
stomp_actors.each(&:connect)
end
def stomp_actors
@brokers.keys.map{|k| Actor["#{k}_listener".to_sym] }
end
def assign(msg, from_listener)
debug "ASSIGN"
@processor_pool.async.process(msg, from_listener)
end
end
end
module Actors
class Processor
include Celluloid
include Celluloid::Logger
attr_accessor :cdr_log
def initialize(boss)
@boss = boss
end
def process(msg, listener)
# some work here...
debug "PROCESS"
listener.async.ack(msg)
end
end
end
module Actors
class StompListener
include Celluloid
include Celluloid::Logger
attr_accessor :conf, :queue, :client, :verbose
def initialize(boss, conf)
@boss = boss
@conf = conf
end
def connect
@queue = conf.queue
@queue_uri = "#{conf.host}:#{conf.port}/queue/#{queue}"
@client = Stomp::Client.open(conf.user, conf.password, conf.host, conf.port, false)
subscribe
end
def subscribe
info "subscribing to #{@queue_uri}"
me = current_actor
@client.subscribe("/queue/#{@queue}", ack: 'client') do |msg|
debug "MSG"
me.emit(msg)
debug "WAIT"
end
end
def emit(msg)
@boss.async.assign(msg, current_actor)
end
def ack(msg)
debug "ACK"
@client.ack(msg)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment