-
-
Save erikh/a325957ba2727fb2d72d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'set' | |
require 'thread' | |
require 'timeout' | |
require 'fileutils' | |
require 'chef-workflow/support/attr' | |
require 'chef-workflow/support/debug' | |
# | |
# This is a scheduler for provisioners. It can run in parallel or serial mode, | |
# and is dependency-based, that is, it will only schedule items for execution | |
# which have all their dependencies satisfied and items that haven't will wait | |
# to execute until that happens. | |
# | |
class VMSupport | |
DEFAULT_VM_FILE = File.join(Dir.pwd, '.chef-workflow', 'vms') | |
class << self | |
extend AttrSupport | |
fancy_attr :vm_file | |
end | |
extend AttrSupport | |
include DebugSupport | |
def self.load | |
self.vm_file ||= DEFAULT_VM_FILE | |
if File.file?(vm_file) | |
return Marshal.load(File.binread(vm_file || DEFAULT_VM_FILE)) | |
end | |
return nil | |
end | |
fancy_attr :serial | |
def initialize | |
@serial = false | |
@vm_groups = { } | |
@vm_dependencies = { } | |
@solver_thread = nil | |
@waiters = Set.new | |
@working = { } | |
@solved = Set.new | |
@queue = Queue.new | |
end | |
def save | |
self.class.vm_file ||= DEFAULT_VM_FILE | |
marshalled = Marshal.dump(self) | |
File.binwrite(self.class.vm_file, marshalled) | |
end | |
# | |
# Schedule a group of VMs for provision. This takes a group name, which is a | |
# string, a provisioner object, and a list of string dependencies. If | |
# anything in the dependencies list hasn't been pre-declared, it refuses to | |
# continue. | |
# | |
def schedule_provision(group_name, provisioner, dependencies=[]) | |
provisioner.name = group_name # FIXME remove | |
@vm_groups[group_name] = provisioner | |
unless dependencies.all? { |x| @vm_groups.has_key?(x) } | |
raise "One of your dependencies for #{group_name} has not been pre-declared. Cannot continue" | |
end | |
@vm_dependencies[group_name] = dependencies.to_set | |
@waiters.add(group_name) | |
end | |
# | |
# Sleep until this list of dependencies are resolved. In parallel mode, will | |
# raise if an exeception occurred while waiting for these resources. In | |
# serial mode, wait_for just returns nil. | |
# | |
def wait_for(*dependencies) | |
return nil if @serial | |
dep_set = dependencies.to_set | |
until dep_set & @solved == dep_set | |
sleep 1 | |
@solver_thread.join unless @solver_thread.alive? | |
end | |
end | |
# | |
# Helper method for scheduling. Wraps items in a timeout and immediately | |
# checks all running workers for exceptions, which are immediately bubbled up | |
# if there are any. If do_loop is true, it will retry the timeout. | |
# | |
def with_timeout(do_loop=true) | |
Timeout.timeout(10) do | |
if @working.values.reject(&:alive?).size > 0 | |
@working.select { |k,v| !v.alive? }.values.map(&:join) | |
end | |
yield | |
end | |
rescue TimeoutError | |
retry if do_loop | |
end | |
# | |
# Start the scheduler. In serial mode this call will block until the whole | |
# dependency graph is satisfied, or one of the provisions fails, at which | |
# point an exception will be raised. In parallel mode, this call completes | |
# immediately, and you should use #wait_for to control main thread flow. | |
# | |
# This call also installs a SIGINFO (Ctrl+T in the terminal on macs) and | |
# SIGUSR2 handler which can be used to get information on the status of | |
# what's solved and what's working. | |
# | |
def run | |
handler = lambda do | |
p ["solved:", @solved] | |
p ["working:", @working] | |
end | |
%w[USR2 INFO].each { |sig| trap(sig, &handler) } | |
queue_runner = lambda do | |
run = true | |
while run | |
ready = [] | |
if @queue.empty? | |
if @serial | |
return | |
else | |
with_timeout { ready << @queue.shift if ready.empty? } | |
end | |
end | |
while !@queue.empty? | |
ready << @queue.shift | |
end | |
ready.each do |r| | |
if r | |
@solved.add(r) | |
@working.delete(r) | |
else | |
run = false | |
end | |
end | |
service_resolved_waiters if run | |
end | |
end | |
if @serial | |
service_resolved_waiters | |
queue_runner.call | |
else | |
@solver_thread = Thread.new do | |
with_timeout(false) { service_resolved_waiters } | |
queue_runner.call | |
end | |
end | |
end | |
# | |
# Instructs the scheduler to stop. Note that this is not an interrupt, and | |
# the queue will still be exhausted before terminating. | |
# | |
def stop | |
if @serial | |
@queue << nil | |
else | |
@working.values.map { |v| v.join rescue nil } | |
@queue << nil | |
@solver_thread.join rescue nil | |
end | |
end | |
# | |
# This method determines what 'waiters', or provisioners that cannot | |
# provision yet because of unresolved dependencies, can be executed. | |
# | |
def service_resolved_waiters | |
@waiters -= (@working.keys.to_set + @solved) | |
@waiters.each do |group_name| | |
if @solved & @vm_dependencies[group_name] == @vm_dependencies[group_name] | |
if_debug do | |
$stderr.puts "Provisioning #{group_name}" | |
end | |
provisioner = @vm_groups[group_name] | |
provision_block = lambda do | |
raise "Could not provision #{group_name}" unless provisioner.startup | |
@queue << group_name | |
end | |
if @serial | |
# HACK: just give the working check something that will always work. | |
# Probably should just mock it. | |
@working[group_name] = Thread.new { sleep } | |
provision_block.call | |
else | |
@working[group_name] = Thread.new(&provision_block) | |
end | |
end | |
end | |
end | |
# | |
# Instruct all the provisioners to tear down. Calls #stop as its first action. | |
# | |
# In parallel mode, the teardown will be done in parallel BUT this call will | |
# block until they all complete. | |
# | |
def teardown | |
stop | |
t = [] | |
@solved.each do |group_name| | |
if_debug do | |
$stderr.puts "Attempting to terminate VM group #{group_name}" | |
end | |
provisioner = @vm_groups[group_name] | |
provisioner_block = lambda do | |
unless provisioner.shutdown | |
if_debug do | |
$stderr.puts "Could not terminate VM group #{group_name}." | |
end | |
end | |
end | |
if @serial | |
provisioner_block.call | |
else | |
t.push(Thread.new(&provisioner_block)) | |
end | |
end | |
unless @serial | |
t.map(&:join) | |
end | |
end | |
end | |
class TestProvisioner | |
attr_accessor :name | |
def startup | |
$stderr.puts "running scheduled startup" | |
sleep 10 | |
true | |
end | |
def shutdown | |
$stderr.puts "running scheduled shutdown" | |
true | |
end | |
end | |
$CHEF_WORKFLOW_DEBUG = 1 | |
v = VMSupport.new | |
v.schedule_provision("foo", TestProvisioner.new) | |
v.schedule_provision("bar", TestProvisioner.new) | |
v.schedule_provision("quux", TestProvisioner.new, %w[foo]) | |
v.schedule_provision("fart", TestProvisioner.new, %w[bar]) | |
v.schedule_provision("poop", TestProvisioner.new, %w[bar quux]) | |
v.schedule_provision("poopie", TestProvisioner.new, %w[foo bar]) | |
v.schedule_provision("hi", TestProvisioner.new) | |
v.schedule_provision("longcat", TestProvisioner.new, %w[poop poopie hi]) | |
v.run | |
begin | |
v.wait_for("longcat") rescue nil | |
v.teardown | |
rescue Interrupt | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment