Skip to content

Instantly share code, notes, and snippets.

@joemiller
Created March 21, 2012 15:05
Show Gist options
  • Save joemiller/2148100 to your computer and use it in GitHub Desktop.
Save joemiller/2148100 to your computer and use it in GitHub Desktop.
modified version of @lusis' celluloid pool test using the new celluloid::pool class in 0.9.0
# NOTE: original gist that this is based on is available here: https://gist.github.com/1143369
require 'celluloid'
require 'logger'
require 'uuid'
require 'sinatra/base'
# This is just a simple demo of a possible Pool implementation for Celluloid
# The sinatra interface exists just to do some testing of crashing workers and the like
# TODO
# Create a busy worker registry of some kind
# Implement a small stats page
LOGGER = Logger.new(STDOUT)
LOGGER.progname = "noah-agent"
Celluloid.logger = LOGGER
class WorkerError < Exception; end
# class Pool
# include Celluloid
# #trap_exit :worker_exception_handler
#
# attr_reader :workers, :busy_workers
#
# def initialize(name, opts = {:num_workers => 10, :worker_class => Worker})
# @name = name
# @workers = []
# @busy_workers = []
# LOGGER.info("Pool #{name} starting up")
# opts[:num_workers].times do |worker|
# start_worker(opts[:worker_class])
# end
# end
#
# def start_worker(klass)
# worker_id = gen_worker_id
# LOGGER.info("Pool #{@name} is starting a #{klass.to_s} worker")
# wkr = klass.supervise_as "#{@name}_worker_#{worker_id}".to_sym, "#{@name}_worker_#{worker_id}"
# @workers << wkr
# end
#
# def notify_worker(msg)
# worker = self.get_worker
# @busy_workers << worker.name
# worker.work msg
# @busy_workers.delete worker.name
# end
#
# def worker_exception_handler(actor, reason)
# LOGGER.debug("Worker #{actor.name} crashed because #{reason}. You should see a doctor about that")
# end
#
#
# protected
# def gen_worker_id
# Digest::SHA1.hexdigest(UUID.generate)
# end
#
# def get_worker
# worker = @workers.sample.actor
# LOGGER.info("Found Worker: #{worker.name} in the pool")
# if worker.alive?
# worker
# else
# LOGGER.error "Worker #{worker.name} was dead. Retrying!"
# self.get_worker
# end
# end
#
# end
class MyWorker
include Celluloid
attr_reader :name
def initialize()
@name = "#{self.class}_#{gen_worker_id}"
LOGGER.debug("New worker reporting for duty: #{@name}")
end
def gen_worker_id
Digest::SHA1.hexdigest(UUID.generate)
end
def work(msg)
LOGGER.info("Message for you (#{@name}) sir! #{msg}")
case msg
when "die"
# Simulate some long-running work that crashes
sleep 2
raise WorkerError, "Boo got shot!"
else
# Simulate some long-running work here
# sleep 4
LOGGER.debug("Hey there camper! #{@name} is doing some work for you")
end
end
end
class TestApp < Sinatra::Base
# @pool = Pool.supervise_as :my_cool_pool, "MyCoolPool", {:num_workers => 30, :worker_class => MyWorker}
@pool = Celluloid::Pool.new(MyWorker, :initial_size => 5, :max_size => 10)
configure do
set :app_file, __FILE__
set :logging, false
set :dump_errors, false
set :run, false
# set :server, "thin"
set :pool, @pool
end
put '/scale' do
# settings.pool.actor.start_worker(MyWorker)
settings.pool.spawn
"Added a worker"
end
get '/stats' do
# "Worker count: #{settings.pool.actor.workers.size}\n Busy workers: #{settings.pool.actor.busy_workers.size}"
busy_workers = settings.pool.size - settings.pool.idle_count
"Worker count: #{settings.pool.size}\n Busy workers: #{busy_workers}"
end
put '/die' do
# settings.pool.actor.notify_worker! "die"
settings.pool.get { |actor| actor.work("die") }
# actor = settings.pool.get
# actor.work "die"
# settings.pool.put actor
# ""
end
put '/send' do
# settings.pool.actor.notify_worker! request.body.read
settings.pool.get { |actor| actor.work(request.body.read) }
# actor = settings.pool.get
# actor.work request.body.read
# settings.pool.put actor
end
end
app = TestApp
app.run!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment