Created
June 14, 2011 23:29
-
-
Save heisters/1026185 to your computer and use it in GitHub Desktop.
Trying to update Ruote processes in mid-air
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class RuoteMigrator | |
attr_reader :process_status | |
def initialize process_status | |
@process_status = process_status | |
end | |
def === other | |
other = other.pdef if other.respond_to?(:pdef) | |
process_status.root_expression.tree == other | |
end | |
class MigrationError < StandardError; end | |
# Update a running process's tree to a different tree, pruning it such that | |
# workitems end up in the right places in the new tree. | |
# | |
# Takes an optional workitem mapping argument that allows you to specify | |
# where in the new tree workitems should end up. For example: | |
# | |
# migrator.update_tree! my_pdef, {'ref' => 'a'} => {'ref' => 'b'} | |
# | |
# ...specifies that if the workitem has `'ref' => 'a'` in its params, the new | |
# tree will be pruned so that the process continues with the first node that | |
# has `'ref' => 'b'` in its attributes. | |
# | |
# You can specify multiple mappings, so: | |
# | |
# migrator.update_tree! my_pdef, | |
# {'ref' => 'a'} => {'ref' => 'b'}, | |
# {'ref' => 'c'} => {'ref' => 'd'} | |
# | |
# ...would map the first workitem that matches 'a' or 'c' to the | |
# respective new refs. | |
# | |
# You can specify multiple mapping criteria to help disambiguate: | |
# | |
# migrator.update_tree! my_pdef, | |
# {'ref' => 'a'} => {'ref' => 'b', 'foo' => 'bar'} | |
# | |
# This alleviates problems where there were two nodes that match 'b', but | |
# only one that matches 'foo' => 'bar'. Note that if there are two nodes that | |
# match the map-to criteria, it will just settle for the first. | |
# | |
# It will endeavor to leave workitems in the same position by generating a | |
# default mapping that leaves positions unchanged. This is equivalent to | |
# something like: | |
# | |
# migrator.update_tree! my_pdef, {'ref' => 'a'} => {'ref' => 'a'} | |
# | |
# ...which, in turn, is equivalent to calling `update_tree!` without a | |
# mappings argument. | |
# | |
# If it is unable to prune the tree to the existing workitems, it will raise | |
# a MigrationError. You can also opt to have it just rewind the process to | |
# the beginning by passing :rewind_silently => true in the mapping argument: | |
# | |
# migrator.update_tree! my_pdef, {'ref' => 'a'} => {'ref' => 'a'}, | |
# :rewind_silently => true | |
# | |
# Note that you can also pass in a tree that is identical to current tree, | |
# and use the mapping options to move the workitems to new positions in the | |
# tree. In this usage, it behaves similarly to a `jump` expression. You can | |
# easily check to see if the status under consideration has the same tree as | |
# another tree using the `===` operator. So, if you only wanted to update the | |
# tree if it's been changed, you could do this: | |
# | |
# migrator.update_tree!(my_pdef) unless migrator === my_pdef | |
# | |
# NB. As with `jump` expressions, you cannot move the workitem to an | |
# expression within a subprocess, but you can move it to the top of a | |
# subexpression. For example: | |
# | |
# migrator.update_tree! my_pdef, {'ref' => 'a'} => {'ref' => 'my_subprocess'} | |
# | |
# WARNING: currently all workitems will have their fields merged (which still | |
# seems preferable to the workitems losing their fields altogether). This is | |
# due to a limitation in Ruote that makes it impossible to specify different | |
# fields for different workitems in Ruote::Engine#re_apply. | |
def update_tree! new_process, mappings_and_options={} | |
mappings, options = normalize_mappings_and_options mappings_and_options | |
fei = process_status.root_expression.fei | |
tree = tree_for_reapplication new_process | |
#p tree | |
new_tree = prune_tree(Ruote.fulldup(tree), mappings).first | |
new_tree = tree if !new_tree && options[:rewind_silently] | |
raise(MigrationError, "could not prune tree for existing workitems") unless new_tree | |
#p new_tree | |
Workflow.engine.re_apply fei, | |
:tree => new_tree, | |
# FIXME: Ruote does not allow us to specify different fields for | |
# different workitems, so all workitems end up with the same fields | |
:fields => process_status.workitems.inject({}){|f,wi|f.merge wi.fields} | |
end | |
protected | |
def normalize_mappings_and_options mappings_and_options | |
mappings_and_options = {:rewind_silently => false}.merge mappings_and_options | |
options = {} | |
options[:rewind_silently] = mappings_and_options.delete(:rewind_silently) | |
# Do this instead of the simpler | |
# `generate_default_mapping.merge(mappings_and_options)` so that | |
# user-supplied mappings will take precedence if they are equivalent for | |
# the sake of mapping, which is different than being equal (which is what | |
# #merge would do). | |
mappings = generate_default_mapping.inject({}) do |m, (from1,to1)| | |
supplied = mappings_and_options.find do |from2, to2| | |
mapping_matches?(from2, from1) | |
end | |
supplied ? m.merge(supplied[0] => supplied[1]) : m.merge(from1 => to1) | |
end | |
return mappings, options | |
end | |
def tree_for_reapplication process | |
new_definition = process.definition | |
returning(Ruote.to_tree{define(&new_definition)}) do |tree| | |
tree[1]["name"] = process_status.definition_name | |
end | |
end | |
def prune_tree tree, target_mappings, current_mappings=current_workitems_as_mappings | |
matched = current_mappings.select do |current_mapping| | |
mappings_match?(target_mappings, tree, current_mapping) | |
end | |
return tree, matched if matched.any? | |
# otherwise, look in the children | |
name, attributes, children = *tree | |
pruned_children = prune_children(children, target_mappings, current_mappings) | |
if pruned_children.any? | |
return [name, attributes, pruned_children], [] | |
else | |
return nil, [] | |
end | |
end | |
def prune_children children, target_mappings, current_mappings | |
while child = children.shift | |
pruned, matched = *prune_tree(child, target_mappings, current_mappings) | |
matched.each{|m|current_mappings.delete m} | |
if pruned && current_mappings.empty? | |
return [pruned] + children | |
elsif pruned | |
return [pruned] + prune_children(children, target_mappings, current_mappings) | |
end | |
end | |
return [] | |
end | |
def mappings_match? target_mappings, node, current_mapping | |
name, attributes, children = *node | |
target_mappings.each do |from, to| | |
from_match = mapping_matches?(from, current_mapping, name) | |
to_match = mapping_matches?(to, attributes, name) | |
#puts "Matching FROM #{from.inspect}/#{current_mapping.inspect} TO #{to.inspect}/#{attributes.inspect} == #{from_match && to_match}" | |
return true if from_match && to_match | |
end | |
return false | |
end | |
def mapping_matches? target_mapping, current_mapping, name=nil | |
current_mapping = current_mapping.dup | |
current_mapping['ref'] ||= name if name | |
# everything in the target is the same as in the current (but maybe not | |
# vice versa) | |
target_mapping.all?{|k,v|current_mapping[k] == v} | |
end | |
# this creates a mapping that is basically a noop: mapping each current | |
# task to itself. | |
def generate_default_mapping | |
current_workitems_as_mappings.inject({}) do |mapping,m| | |
mapping[m] = m; mapping | |
end | |
end | |
def current_workitems_as_mappings | |
process_status.current_workitems.inject([]) do |mappings,wi| | |
params = wi.fields["params"] | |
mappings + [params] | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require File.dirname(__FILE__)+'/../spec_helper' | |
describe RuoteMigrator, :type => :process do | |
before do | |
clear_workflow_storage! | |
end | |
def define *args, &block | |
Processes.define *args, &block | |
end | |
def launch | |
patient_wait_for_with_timeout :signal => "one" do | |
@wfid = Processes["test"].launch | |
end | |
end | |
def status reload=false | |
return @status if @status and !reload | |
@status = Processes.status(@wfid) | |
end | |
def migrator reload=false | |
return @migrator if @migrator and !reload | |
@migrator = RuoteMigrator.new(status(reload)) | |
end | |
def reply | |
patient_wait_for_with_timeout do | |
Workflow.workitems.reply(status.current_workitems.last) | |
end | |
end | |
def current_task_names | |
status(true).current_tasks.map(&:process_task_name) | |
end | |
def define_and_advance | |
#define("test"){one :task => 'a'; one :task => 'b'} | |
#launch | |
#reply | |
#dump_process_fixture @wfid, :test_a_b_at_b | |
@wfid = restore_process_fixture :test_a_b_at_b | |
end | |
it "should be able to tell when a definition has changed from a running process" do | |
define("test"){one :task => 'a'; one :task => 'b'} | |
launch; reply | |
(migrator === Processes["test"]).should be_true | |
define("test"){one :task => 'c'; one :task => 'd'} | |
(migrator === Processes["test"]).should be_false | |
end | |
it "should raise an error if the existing workitems can't automatically be remapped" do | |
define("test"){one :task => "a"} | |
launch | |
define("test"){one :task => "c"} | |
lambda do | |
patient_wait_for_with_timeout do | |
migrator.update_tree! Processes["test"] | |
end | |
end.should raise_error(RuoteMigrator::MigrationError) | |
end | |
it "should be able to deal with a task redefinition by rewinding to the beginning" do | |
define_and_advance | |
define("test"){one :task => "c"; one :task => "d"; one :task => "e"} | |
patient_wait_for_with_timeout do | |
migrator.update_tree! Processes["test"], :rewind_silently => true | |
end | |
current_task_names.should == ["one_c"] | |
reply | |
current_task_names.should == ["one_d"] | |
end | |
it "should raise an error if the existing workitems can't be remapped using a supplied mapping" do | |
define_and_advance | |
define("test"){one :task => "c"; one :task => "d"; one :task => "e"} | |
lambda do | |
patient_wait_for_with_timeout do | |
migrator.update_tree!(Processes["test"], {'task' => 'a'} => {'task' => 'd'}) | |
end | |
end.should raise_error(RuoteMigrator::MigrationError) | |
end | |
it "should not raise an error if the workitems can be remapped automatically but the mapping doesn't apply" do | |
define("test"){one :task => 'a'; one :task => 'b'} | |
launch | |
current_task_names.should == ["one_a"] | |
define("test"){one :task => "c"; one :task => "a"} | |
lambda do | |
patient_wait_for_with_timeout do | |
migrator.update_tree!(Processes["test"], {'task' => 'b'} => {'task' => 'c'}) | |
end | |
end.should_not raise_error | |
current_task_names.should == ["one_a"] | |
end | |
it "should prefer requested mappings over default mappings" do | |
define("test"){one :task => 'a'; one :task => 'b'} | |
launch | |
current_task_names.should == ["one_a"] | |
# this definition allows both the default and the supplied mapping to work | |
define("test"){one :task => "a"; one :task => "c"} | |
patient_wait_for_with_timeout do | |
# this mapping will be merged with the default: | |
# {'task' => 'a', 'ref' => 'one'} => {'task' => 'a', 'ref' => 'one'}, | |
# but this should take precedence | |
migrator.update_tree!(Processes["test"], {'task' => 'a'} => {'task' => 'c'}) | |
end | |
current_task_names.should == ["one_c"] | |
end | |
it "should be able to deal with a task redefinition by mapping task name changes" do | |
define_and_advance | |
define("test"){one :task => "c"; one :task => "d"; one :task => "e"} | |
patient_wait_for_with_timeout do | |
migrator.update_tree!(Processes["test"], {'task' => 'b'} => {'task' => 'd'}) | |
end | |
current_task_names.should == ["one_d"] | |
reply | |
current_task_names.should == ["one_e"] | |
end | |
it "should be able to map task name changes on a complex process" do | |
define("test") do | |
one :task => "a" | |
cursor do | |
one :task => "b" | |
one :task => "c" | |
end | |
one :task => "d" | |
end | |
launch | |
current_task_names.should == ["one_a"] | |
reply | |
current_task_names.should == ["one_b"] | |
define("test") do | |
cursor do | |
one :task => "f" | |
cursor do | |
one :task => "g" | |
end | |
end | |
one :task => "h" | |
end | |
patient_wait_for_with_timeout do | |
migrator.update_tree!(Processes["test"], {'task' => 'b'} => {'task' => 'g'}) | |
end | |
current_task_names.should == ["one_g"] | |
reply | |
current_task_names.should == ["one_h"] | |
end | |
it "should ignore task mappings that don't apply to this process instance, but still apply ones that do" do | |
define_and_advance | |
define("test"){one :task => "c"; one :task => "d"; one :task => "e"} | |
patient_wait_for_with_timeout do | |
migrator.update_tree!(Processes["test"], {'task' => 'a'} => {'task' => 'd'}, {'task' => 'b'} => {'task' => 'e'}) | |
end | |
current_task_names.should == ["one_e"] | |
end | |
it "should allow mapping tasks where two tasks have the same name, but different participants" do | |
define_and_advance | |
define("test"){one :task => "c"; two :task => "c"} | |
patient_wait_for_with_timeout do | |
migrator.update_tree!(Processes["test"], {'task' => 'b'} => {'task' => 'c', 'ref' => 'two'}) | |
end | |
current_task_names.should == ["two_c"] | |
end | |
it "should ensure the task remains the same, if possible" do | |
define_and_advance | |
define("test"){one :task => "c"; one :task => "b"; one :task => "d"} | |
patient_wait_for_with_timeout do | |
migrator.update_tree! Processes["test"] | |
end | |
current_task_names.should == ["one_b"] | |
reply | |
current_task_names.should == ["one_d"] | |
end | |
it "should do the right thing when mapping a concurrent process" do | |
define("test") do | |
concurrence do | |
one :task => "a" | |
two :task => "b" | |
end | |
three :task => "c" | |
end | |
launch | |
current_task_names.should == ["one_a", "two_b"] | |
define("test") do | |
concurrence do | |
one :task => "d" | |
two :task => "e" | |
cursor do | |
one :task => "f" | |
one :task => "g" | |
end | |
end | |
three :task => "h" | |
end | |
patient_wait_for_with_timeout :signal => "one" do | |
migrator.update_tree!(Processes["test"], {'task' => 'a'} => {'task' => 'g'}, {'task' => 'b'} => {'task' => 'e'}) | |
end | |
current_task_names.should == ["two_e", "one_g"] | |
wis = status.workitems | |
patient_wait_for_with_timeout :signal => "three" do | |
Workflow.workitems.reply(wis.find{|wi|wi.participant_name == "two"}) | |
Workflow.workitems.reply(wis.find{|wi|wi.participant_name == "one"}) | |
end | |
current_task_names.should == ["three_h"] | |
end | |
it "should not destroy fields on the workitem" do | |
define("test"){set :field => 'my_field', :value => 'yes'; one :task => 'a'} | |
launch | |
status.workitems.first.fields.should include('my_field' => 'yes') | |
define("test"){one :task => 'b'} | |
patient_wait_for_with_timeout do | |
migrator.update_tree! Processes["test"], {'task' => 'a'} => {'task' => 'b'} | |
end | |
status(true).workitems.first.fields.should include('my_field' => 'yes') | |
end | |
it "should be able to jump between subprocesses" do | |
define("test") do | |
define "sub_1" do | |
one :task => 'a' | |
end | |
define "sub_2" do | |
one :task => 'b' | |
end | |
cursor :tag => 'main' do | |
sub_1 | |
sub_2 | |
end | |
end | |
launch | |
current_task_names.should == ["one_a"] | |
patient_wait_for_with_timeout do | |
migrator.update_tree! Processes["test"], {'task' => 'a'} => {'ref' => 'sub_2'} | |
end | |
current_task_names.should == ["one_b"] | |
end | |
it "should raise an error if you try to move to a subexpression of a subprocess" do | |
pending "There doesn't seem to be a way to detect that this is being done" | |
define("test") do | |
define "sub_1" do | |
one :task => 'a' | |
end | |
define "sub_2" do | |
one :task => 'b' | |
one :task => 'c' | |
end | |
cursor :tag => 'main' do | |
sub_1 | |
sub_2 | |
end | |
end | |
launch | |
current_task_names.should == ["one_a"] | |
lambda do | |
patient_wait_for_with_timeout do | |
migrator.update_tree! Processes["test"], {'task' => 'a'} => {'task' => 'c'} | |
end | |
end.should raise_error | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment