Created
March 31, 2014 19:59
-
-
Save jessesanford/9900893 to your computer and use it in GitHub Desktop.
Celluloid monitoring jobs on Supervised Pools for failure and restarting.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'celluloid' | |
require 'worker' | |
class Manager | |
include Celluloid | |
attr_reader :worker_supervisor, :workers | |
trap_exit :worker_died | |
def initialize(message) | |
puts "manager got #{message} when instantiated" | |
#start SupervisionGroup | |
@worker_supervisor = Celluloid::SupervisionGroup.run! | |
#Get a handle on the SupervisionGroup::Member | |
#http://rubydoc.info/gems/celluloid/Celluloid/SupervisionGroup/Member | |
workers_pool = @worker_supervisor.pool(Worker, as: :workers, args: [1,2,3], size: 3) | |
#Get a handle on the PoolManager | |
#http://rubydoc.info/gems/celluloid/Celluloid/PoolManager | |
@workers = workers_pool.actor | |
@jobs = {} | |
@job_to_worker = {} | |
@worker_to_job = {} | |
end | |
#call to send an actor a job | |
def delegate(job) | |
@jobs[job[:id]] = job | |
#start work and send it to the background | |
@workers.async.work(job, Actor.current) | |
end | |
#call back from actor once it has received it's job | |
#actor should do this asap | |
def register_worker_for_job(job, worker) | |
@job_to_worker[job[:id]] = worker | |
@worker_to_job[worker.mailbox.address] = job | |
Actor.current.link worker | |
puts "Worker who called back for job: #{job} was #{worker}" | |
end | |
def get_worker_for_job(job) | |
worker = @job_to_worker[job[:id]] | |
end | |
#lookup status of job by asking actor running it | |
def get_job_status(job) | |
actor = @registered_jobs[job[:id]] | |
status = actor.status | |
end | |
def worker_died(worker, reason) | |
job = @worker_to_job[worker.mailbox.address] | |
@worker_to_job.delete(worker.mailbox.address) | |
p "restarting #{job} on new worker" | |
delegate(job) | |
end | |
end | |
class Worker | |
include Celluloid | |
attr_reader :value | |
def initialize(arg1, arg2, arg3) | |
@value = 42 | |
end | |
def work(job, manager) | |
manager.register_worker_for_job(job, Actor.current) | |
sleep 300 | |
end | |
def crash | |
raise "the spec purposely crashed me :(" | |
end | |
end | |
myman = Manager.new('foobar') | |
job = {id: 1} | |
myman.delegate(job) | |
worker = myman.get_worker_for_job(job) | |
worker.crash |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment