Skip to content

Instantly share code, notes, and snippets.

@heisters
Created June 14, 2011 23:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save heisters/1026185 to your computer and use it in GitHub Desktop.
Save heisters/1026185 to your computer and use it in GitHub Desktop.
Trying to update Ruote processes in mid-air
class RuoteMigrator
attr_reader :process_status
def initialize process_status
@process_status = process_status
end
def === other
other = other.pdef if other.respond_to?(:pdef)
process_status.root_expression.tree == other
end
class MigrationError < StandardError; end
# Update a running process's tree to a different tree, pruning it such that
# workitems end up in the right places in the new tree.
#
# Takes an optional workitem mapping argument that allows you to specify
# where in the new tree workitems should end up. For example:
#
# migrator.update_tree! my_pdef, {'ref' => 'a'} => {'ref' => 'b'}
#
# ...specifies that if the workitem has `'ref' => 'a'` in its params, the new
# tree will be pruned so that the process continues with the first node that
# has `'ref' => 'b'` in its attributes.
#
# You can specify multiple mappings, so:
#
# migrator.update_tree! my_pdef,
# {'ref' => 'a'} => {'ref' => 'b'},
# {'ref' => 'c'} => {'ref' => 'd'}
#
# ...would map the first workitem that matches 'a' or 'c' to the
# respective new refs.
#
# You can specify multiple mapping criteria to help disambiguate:
#
# migrator.update_tree! my_pdef,
# {'ref' => 'a'} => {'ref' => 'b', 'foo' => 'bar'}
#
# This alleviates problems where there were two nodes that match 'b', but
# only one that matches 'foo' => 'bar'. Note that if there are two nodes that
# match the map-to criteria, it will just settle for the first.
#
# It will endeavor to leave workitems in the same position by generating a
# default mapping that leaves positions unchanged. This is equivalent to
# something like:
#
# migrator.update_tree! my_pdef, {'ref' => 'a'} => {'ref' => 'a'}
#
# ...which, in turn, is equivalent to calling `update_tree!` without a
# mappings argument.
#
# If it is unable to prune the tree to the existing workitems, it will raise
# a MigrationError. You can also opt to have it just rewind the process to
# the beginning by passing :rewind_silently => true in the mapping argument:
#
# migrator.update_tree! my_pdef, {'ref' => 'a'} => {'ref' => 'a'},
# :rewind_silently => true
#
# Note that you can also pass in a tree that is identical to current tree,
# and use the mapping options to move the workitems to new positions in the
# tree. In this usage, it behaves similarly to a `jump` expression. You can
# easily check to see if the status under consideration has the same tree as
# another tree using the `===` operator. So, if you only wanted to update the
# tree if it's been changed, you could do this:
#
# migrator.update_tree!(my_pdef) unless migrator === my_pdef
#
# NB. As with `jump` expressions, you cannot move the workitem to an
# expression within a subprocess, but you can move it to the top of a
# subexpression. For example:
#
# migrator.update_tree! my_pdef, {'ref' => 'a'} => {'ref' => 'my_subprocess'}
#
# WARNING: currently all workitems will have their fields merged (which still
# seems preferable to the workitems losing their fields altogether). This is
# due to a limitation in Ruote that makes it impossible to specify different
# fields for different workitems in Ruote::Engine#re_apply.
def update_tree! new_process, mappings_and_options={}
mappings, options = normalize_mappings_and_options mappings_and_options
fei = process_status.root_expression.fei
tree = tree_for_reapplication new_process
#p tree
new_tree = prune_tree(Ruote.fulldup(tree), mappings).first
new_tree = tree if !new_tree && options[:rewind_silently]
raise(MigrationError, "could not prune tree for existing workitems") unless new_tree
#p new_tree
Workflow.engine.re_apply fei,
:tree => new_tree,
# FIXME: Ruote does not allow us to specify different fields for
# different workitems, so all workitems end up with the same fields
:fields => process_status.workitems.inject({}){|f,wi|f.merge wi.fields}
end
protected
def normalize_mappings_and_options mappings_and_options
mappings_and_options = {:rewind_silently => false}.merge mappings_and_options
options = {}
options[:rewind_silently] = mappings_and_options.delete(:rewind_silently)
# Do this instead of the simpler
# `generate_default_mapping.merge(mappings_and_options)` so that
# user-supplied mappings will take precedence if they are equivalent for
# the sake of mapping, which is different than being equal (which is what
# #merge would do).
mappings = generate_default_mapping.inject({}) do |m, (from1,to1)|
supplied = mappings_and_options.find do |from2, to2|
mapping_matches?(from2, from1)
end
supplied ? m.merge(supplied[0] => supplied[1]) : m.merge(from1 => to1)
end
return mappings, options
end
def tree_for_reapplication process
new_definition = process.definition
returning(Ruote.to_tree{define(&new_definition)}) do |tree|
tree[1]["name"] = process_status.definition_name
end
end
def prune_tree tree, target_mappings, current_mappings=current_workitems_as_mappings
matched = current_mappings.select do |current_mapping|
mappings_match?(target_mappings, tree, current_mapping)
end
return tree, matched if matched.any?
# otherwise, look in the children
name, attributes, children = *tree
pruned_children = prune_children(children, target_mappings, current_mappings)
if pruned_children.any?
return [name, attributes, pruned_children], []
else
return nil, []
end
end
def prune_children children, target_mappings, current_mappings
while child = children.shift
pruned, matched = *prune_tree(child, target_mappings, current_mappings)
matched.each{|m|current_mappings.delete m}
if pruned && current_mappings.empty?
return [pruned] + children
elsif pruned
return [pruned] + prune_children(children, target_mappings, current_mappings)
end
end
return []
end
def mappings_match? target_mappings, node, current_mapping
name, attributes, children = *node
target_mappings.each do |from, to|
from_match = mapping_matches?(from, current_mapping, name)
to_match = mapping_matches?(to, attributes, name)
#puts "Matching FROM #{from.inspect}/#{current_mapping.inspect} TO #{to.inspect}/#{attributes.inspect} == #{from_match && to_match}"
return true if from_match && to_match
end
return false
end
def mapping_matches? target_mapping, current_mapping, name=nil
current_mapping = current_mapping.dup
current_mapping['ref'] ||= name if name
# everything in the target is the same as in the current (but maybe not
# vice versa)
target_mapping.all?{|k,v|current_mapping[k] == v}
end
# this creates a mapping that is basically a noop: mapping each current
# task to itself.
def generate_default_mapping
current_workitems_as_mappings.inject({}) do |mapping,m|
mapping[m] = m; mapping
end
end
def current_workitems_as_mappings
process_status.current_workitems.inject([]) do |mappings,wi|
params = wi.fields["params"]
mappings + [params]
end
end
end
require File.dirname(__FILE__)+'/../spec_helper'
describe RuoteMigrator, :type => :process do
before do
clear_workflow_storage!
end
def define *args, &block
Processes.define *args, &block
end
def launch
patient_wait_for_with_timeout :signal => "one" do
@wfid = Processes["test"].launch
end
end
def status reload=false
return @status if @status and !reload
@status = Processes.status(@wfid)
end
def migrator reload=false
return @migrator if @migrator and !reload
@migrator = RuoteMigrator.new(status(reload))
end
def reply
patient_wait_for_with_timeout do
Workflow.workitems.reply(status.current_workitems.last)
end
end
def current_task_names
status(true).current_tasks.map(&:process_task_name)
end
def define_and_advance
#define("test"){one :task => 'a'; one :task => 'b'}
#launch
#reply
#dump_process_fixture @wfid, :test_a_b_at_b
@wfid = restore_process_fixture :test_a_b_at_b
end
it "should be able to tell when a definition has changed from a running process" do
define("test"){one :task => 'a'; one :task => 'b'}
launch; reply
(migrator === Processes["test"]).should be_true
define("test"){one :task => 'c'; one :task => 'd'}
(migrator === Processes["test"]).should be_false
end
it "should raise an error if the existing workitems can't automatically be remapped" do
define("test"){one :task => "a"}
launch
define("test"){one :task => "c"}
lambda do
patient_wait_for_with_timeout do
migrator.update_tree! Processes["test"]
end
end.should raise_error(RuoteMigrator::MigrationError)
end
it "should be able to deal with a task redefinition by rewinding to the beginning" do
define_and_advance
define("test"){one :task => "c"; one :task => "d"; one :task => "e"}
patient_wait_for_with_timeout do
migrator.update_tree! Processes["test"], :rewind_silently => true
end
current_task_names.should == ["one_c"]
reply
current_task_names.should == ["one_d"]
end
it "should raise an error if the existing workitems can't be remapped using a supplied mapping" do
define_and_advance
define("test"){one :task => "c"; one :task => "d"; one :task => "e"}
lambda do
patient_wait_for_with_timeout do
migrator.update_tree!(Processes["test"], {'task' => 'a'} => {'task' => 'd'})
end
end.should raise_error(RuoteMigrator::MigrationError)
end
it "should not raise an error if the workitems can be remapped automatically but the mapping doesn't apply" do
define("test"){one :task => 'a'; one :task => 'b'}
launch
current_task_names.should == ["one_a"]
define("test"){one :task => "c"; one :task => "a"}
lambda do
patient_wait_for_with_timeout do
migrator.update_tree!(Processes["test"], {'task' => 'b'} => {'task' => 'c'})
end
end.should_not raise_error
current_task_names.should == ["one_a"]
end
it "should prefer requested mappings over default mappings" do
define("test"){one :task => 'a'; one :task => 'b'}
launch
current_task_names.should == ["one_a"]
# this definition allows both the default and the supplied mapping to work
define("test"){one :task => "a"; one :task => "c"}
patient_wait_for_with_timeout do
# this mapping will be merged with the default:
# {'task' => 'a', 'ref' => 'one'} => {'task' => 'a', 'ref' => 'one'},
# but this should take precedence
migrator.update_tree!(Processes["test"], {'task' => 'a'} => {'task' => 'c'})
end
current_task_names.should == ["one_c"]
end
it "should be able to deal with a task redefinition by mapping task name changes" do
define_and_advance
define("test"){one :task => "c"; one :task => "d"; one :task => "e"}
patient_wait_for_with_timeout do
migrator.update_tree!(Processes["test"], {'task' => 'b'} => {'task' => 'd'})
end
current_task_names.should == ["one_d"]
reply
current_task_names.should == ["one_e"]
end
it "should be able to map task name changes on a complex process" do
define("test") do
one :task => "a"
cursor do
one :task => "b"
one :task => "c"
end
one :task => "d"
end
launch
current_task_names.should == ["one_a"]
reply
current_task_names.should == ["one_b"]
define("test") do
cursor do
one :task => "f"
cursor do
one :task => "g"
end
end
one :task => "h"
end
patient_wait_for_with_timeout do
migrator.update_tree!(Processes["test"], {'task' => 'b'} => {'task' => 'g'})
end
current_task_names.should == ["one_g"]
reply
current_task_names.should == ["one_h"]
end
it "should ignore task mappings that don't apply to this process instance, but still apply ones that do" do
define_and_advance
define("test"){one :task => "c"; one :task => "d"; one :task => "e"}
patient_wait_for_with_timeout do
migrator.update_tree!(Processes["test"], {'task' => 'a'} => {'task' => 'd'}, {'task' => 'b'} => {'task' => 'e'})
end
current_task_names.should == ["one_e"]
end
it "should allow mapping tasks where two tasks have the same name, but different participants" do
define_and_advance
define("test"){one :task => "c"; two :task => "c"}
patient_wait_for_with_timeout do
migrator.update_tree!(Processes["test"], {'task' => 'b'} => {'task' => 'c', 'ref' => 'two'})
end
current_task_names.should == ["two_c"]
end
it "should ensure the task remains the same, if possible" do
define_and_advance
define("test"){one :task => "c"; one :task => "b"; one :task => "d"}
patient_wait_for_with_timeout do
migrator.update_tree! Processes["test"]
end
current_task_names.should == ["one_b"]
reply
current_task_names.should == ["one_d"]
end
it "should do the right thing when mapping a concurrent process" do
define("test") do
concurrence do
one :task => "a"
two :task => "b"
end
three :task => "c"
end
launch
current_task_names.should == ["one_a", "two_b"]
define("test") do
concurrence do
one :task => "d"
two :task => "e"
cursor do
one :task => "f"
one :task => "g"
end
end
three :task => "h"
end
patient_wait_for_with_timeout :signal => "one" do
migrator.update_tree!(Processes["test"], {'task' => 'a'} => {'task' => 'g'}, {'task' => 'b'} => {'task' => 'e'})
end
current_task_names.should == ["two_e", "one_g"]
wis = status.workitems
patient_wait_for_with_timeout :signal => "three" do
Workflow.workitems.reply(wis.find{|wi|wi.participant_name == "two"})
Workflow.workitems.reply(wis.find{|wi|wi.participant_name == "one"})
end
current_task_names.should == ["three_h"]
end
it "should not destroy fields on the workitem" do
define("test"){set :field => 'my_field', :value => 'yes'; one :task => 'a'}
launch
status.workitems.first.fields.should include('my_field' => 'yes')
define("test"){one :task => 'b'}
patient_wait_for_with_timeout do
migrator.update_tree! Processes["test"], {'task' => 'a'} => {'task' => 'b'}
end
status(true).workitems.first.fields.should include('my_field' => 'yes')
end
it "should be able to jump between subprocesses" do
define("test") do
define "sub_1" do
one :task => 'a'
end
define "sub_2" do
one :task => 'b'
end
cursor :tag => 'main' do
sub_1
sub_2
end
end
launch
current_task_names.should == ["one_a"]
patient_wait_for_with_timeout do
migrator.update_tree! Processes["test"], {'task' => 'a'} => {'ref' => 'sub_2'}
end
current_task_names.should == ["one_b"]
end
it "should raise an error if you try to move to a subexpression of a subprocess" do
pending "There doesn't seem to be a way to detect that this is being done"
define("test") do
define "sub_1" do
one :task => 'a'
end
define "sub_2" do
one :task => 'b'
one :task => 'c'
end
cursor :tag => 'main' do
sub_1
sub_2
end
end
launch
current_task_names.should == ["one_a"]
lambda do
patient_wait_for_with_timeout do
migrator.update_tree! Processes["test"], {'task' => 'a'} => {'task' => 'c'}
end
end.should raise_error
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment