Skip to content

Instantly share code, notes, and snippets.

@jessesanford
Created March 31, 2014 19:59
Show Gist options
  • Save jessesanford/9900893 to your computer and use it in GitHub Desktop.
Save jessesanford/9900893 to your computer and use it in GitHub Desktop.
Celluloid monitoring jobs on Supervised Pools for failure and restarting.
require 'celluloid'
require 'worker'
class Manager
include Celluloid
attr_reader :worker_supervisor, :workers
trap_exit :worker_died
def initialize(message)
puts "manager got #{message} when instantiated"
#start SupervisionGroup
@worker_supervisor = Celluloid::SupervisionGroup.run!
#Get a handle on the SupervisionGroup::Member
#http://rubydoc.info/gems/celluloid/Celluloid/SupervisionGroup/Member
workers_pool = @worker_supervisor.pool(Worker, as: :workers, args: [1,2,3], size: 3)
#Get a handle on the PoolManager
#http://rubydoc.info/gems/celluloid/Celluloid/PoolManager
@workers = workers_pool.actor
@jobs = {}
@job_to_worker = {}
@worker_to_job = {}
end
#call to send an actor a job
def delegate(job)
@jobs[job[:id]] = job
#start work and send it to the background
@workers.async.work(job, Actor.current)
end
#call back from actor once it has received it's job
#actor should do this asap
def register_worker_for_job(job, worker)
@job_to_worker[job[:id]] = worker
@worker_to_job[worker.mailbox.address] = job
Actor.current.link worker
puts "Worker who called back for job: #{job} was #{worker}"
end
def get_worker_for_job(job)
worker = @job_to_worker[job[:id]]
end
#lookup status of job by asking actor running it
def get_job_status(job)
actor = @registered_jobs[job[:id]]
status = actor.status
end
def worker_died(worker, reason)
job = @worker_to_job[worker.mailbox.address]
@worker_to_job.delete(worker.mailbox.address)
p "restarting #{job} on new worker"
delegate(job)
end
end
class Worker
include Celluloid
attr_reader :value
def initialize(arg1, arg2, arg3)
@value = 42
end
def work(job, manager)
manager.register_worker_for_job(job, Actor.current)
sleep 300
end
def crash
raise "the spec purposely crashed me :("
end
end
myman = Manager.new('foobar')
job = {id: 1}
myman.delegate(job)
worker = myman.get_worker_for_job(job)
worker.crash
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment