Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
require 'tsort'
class DAG
include TSort
def initialize
@tasks = {}
end
def tsort_each_node(&block)
@tasks.each_key(&block)
end
def tsort_each_child(node, &block)
@tasks.fetch(node).each(&block)
end
def add_task(task)
_add_task(nil, task)
end
private
def _add_task(parent, task)
unless @tasks[task]
@tasks[task] = task.requires
task.requires.each do |r|
_add_task(task, r)
end
end
task
end
end
require_relative 'workflow'
require_relative 'task'
class EchoTask < Task
def run
puts "#{self.class.name}#run"
end
end
class TaskA < EchoTask
def requires
[ TaskB.new, TaskC.new ]
end
end
class TaskB < EchoTask
def requires
[ TaskD.new ]
end
end
class TaskC < EchoTask
def requires
[ TaskD.new ]
end
end
class TaskD < EchoTask
end
Workflow.new.run(TaskA.new)
class Task
def run
raise NotImplementedError, "You must implement #{self.class}##{__method__}"
end
def requires
[] # If you need to define task dependencies, override in subclass
end
def eql?(other)
self.hash == other.hash
end
def hash
self.class.name.hash
end
end
require_relative 'dag'
class Workflow
def run(task)
dag = DAG.new
dag.add_task(task)
dag.tsort.each(&:run)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.