Skip to content

Instantly share code, notes, and snippets.

@hakobera
Created March 20, 2016 13:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hakobera/fd4ff0a09c493713ce13 to your computer and use it in GitHub Desktop.
Save hakobera/fd4ff0a09c493713ce13 to your computer and use it in GitHub Desktop.
require 'tsort'
class DAG
include TSort
def initialize
@tasks = {}
end
def tsort_each_node(&block)
@tasks.each_key(&block)
end
def tsort_each_child(node, &block)
@tasks.fetch(node).each(&block)
end
def add_task(task)
unless @tasks[task]
@tasks[task] = task.requires
task.requires.each do |r|
add_task(r)
end
end
task
end
end
require_relative 'workflow'
require_relative 'task'
require_relative 'local_file_target'
class WaitTask < Task
def output
LocalFileTarget.new("/tmp/data_#{self.class.name}_#{Time.now.strftime('%Y-%m-%d')}.txt")
end
def run
puts "#{Time.now} start #{self.class.name}"
sleep(5)
path = output.path
File.write(path, "done")
puts "#{Time.now} end #{self.class.name}"
end
end
class Task1 < WaitTask
def requires; [ Task2.new, Task3.new, Task4.new, Task5.new ]; end
end
class Task2 < WaitTask
def requires; [ Task6.new ]; end
end
class Task3 < WaitTask
def requires; [ Task6.new ]; end
end
class Task4 < WaitTask
def requires; [ Task6.new ]; end
end
class Task5 < WaitTask
def requires; [ Task6.new ]; end
end
class Task6 < WaitTask
end
Workflow.new(concurrency: 4).run(Task1.new)
require_relative 'target'
class LocalFileTarget < Target
attr_reader :path
def initialize(path)
@path = path
end
def exists?
File.exist?(path)
end
end
class Target
def exists?
raise NotImplementedError, "You must implement #{self.class}##{__method__}"
end
end
class Task
def input
@input ||= requires.map { |task| task.output }
end
def output
[]
end
def run
raise NotImplementedError, "You must implement #{self.class}##{__method__}"
end
def requires
[] # If you need to define task dependencies, override in subclass
end
def ready?
requires.all? {|t| t.completed?}
end
def completed?
_output.all? {|o| o.exists?}
end
def eql?(other)
self.hash == other.hash
end
def hash
self.class.name.hash
end
private
def _output
o = output
o = [o] unless o.is_a?(Array)
o
end
end
require_relative 'dag'
require 'thread'
class Workflow
def initialize(conf={})
@dag = DAG.new
@concurrency = conf[:concurrency] || 1
end
def run(task)
@dag.add_task(task)
parallel(@dag.tsort, @concurrency) do |t|
until t.ready?
puts "#{Time.now} #{t.class.name} is not ready"
sleep 1
end
t.run unless t.completed?
end
end
def parallel(enumerable, concurrency)
q = Queue.new
enumerable.each {|e| q << e}
proc = lambda do
until q.empty?
t = q.pop
puts "#{Time.now} pop task: #{t.class.name}"
yield t
end
end
threads = []
concurrency.times do
threads << Thread.start(&proc)
end
threads.each {|t| t.join}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment