Skip to content

Instantly share code, notes, and snippets.

@hakobera
Created January 6, 2016 05:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hakobera/03bcc4576b7bacab419f to your computer and use it in GitHub Desktop.
Save hakobera/03bcc4576b7bacab419f to your computer and use it in GitHub Desktop.
require 'tsort'
class DAG
include TSort
def initialize
@tasks = {}
end
def tsort_each_node(&block)
@tasks.each_key(&block)
end
def tsort_each_child(node, &block)
@tasks.fetch(node).each(&block)
end
def add_task(task)
_add_task(nil, task)
end
private
def _add_task(parent, task)
unless @tasks[task]
@tasks[task] = task.requires
task.requires.each do |r|
_add_task(task, r)
end
end
task
end
end
require_relative 'workflow'
require_relative 'task'
require_relative 'local_file_target'
class GenerateDataTask < Task
def output
LocalFileTarget.new("/tmp/data_#{Time.now.strftime('%Y-%m-%d')}.txt")
end
def run
File.open(output.path, "w") do |f|
(1..10).each do |i|
f.puts i
end
end
end
end
class SumTask < Task
def requires
[ GenerateDataTask.new ]
end
def output
LocalFileTarget.new("/tmp/output_#{Time.now.strftime('%Y-%m-%d')}.txt")
end
def run
sum = 0
File.foreach(input[0].path) do |line|
sum += line.to_i
end
File.write(output.path, sum)
end
end
Workflow.new.run(SumTask.new)
require_relative 'target'
class LocalFileTarget < Target
attr_reader :path
def initialize(path)
@path = path
end
def exists?
File.exist?(path)
end
end
class Target
def exists?
raise NotImplementedError, "You must implement #{self.class}##{__method__}"
end
end
class Task
def input
@input ||= requires.map { |task| task.output }
end
def output
raise NotImplementedError, "You must implement #{self.class}##{__method__}"
end
def run
raise NotImplementedError, "You must implement #{self.class}##{__method__}"
end
def requires
[] # If you need to define task dependencies, override in subclass
end
def eql?(other)
self.hash == other.hash
end
def hash
self.class.name.hash
end
end
require_relative 'dag'
class Workflow
def run(task)
dag = DAG.new
dag.add_task(task)
dag.tsort.each do |t|
t.run unless t.output.exists?
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment