記事は Qiita に書いています。
Created
March 20, 2016 13:18
-
-
Save hakobera/fd4ff0a09c493713ce13 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'tsort' | |
class DAG | |
include TSort | |
def initialize | |
@tasks = {} | |
end | |
def tsort_each_node(&block) | |
@tasks.each_key(&block) | |
end | |
def tsort_each_child(node, &block) | |
@tasks.fetch(node).each(&block) | |
end | |
def add_task(task) | |
unless @tasks[task] | |
@tasks[task] = task.requires | |
task.requires.each do |r| | |
add_task(r) | |
end | |
end | |
task | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative 'workflow' | |
require_relative 'task' | |
require_relative 'local_file_target' | |
class WaitTask < Task | |
def output | |
LocalFileTarget.new("/tmp/data_#{self.class.name}_#{Time.now.strftime('%Y-%m-%d')}.txt") | |
end | |
def run | |
puts "#{Time.now} start #{self.class.name}" | |
sleep(5) | |
path = output.path | |
File.write(path, "done") | |
puts "#{Time.now} end #{self.class.name}" | |
end | |
end | |
class Task1 < WaitTask | |
def requires; [ Task2.new, Task3.new, Task4.new, Task5.new ]; end | |
end | |
class Task2 < WaitTask | |
def requires; [ Task6.new ]; end | |
end | |
class Task3 < WaitTask | |
def requires; [ Task6.new ]; end | |
end | |
class Task4 < WaitTask | |
def requires; [ Task6.new ]; end | |
end | |
class Task5 < WaitTask | |
def requires; [ Task6.new ]; end | |
end | |
class Task6 < WaitTask | |
end | |
Workflow.new(concurrency: 4).run(Task1.new) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative 'target' | |
class LocalFileTarget < Target | |
attr_reader :path | |
def initialize(path) | |
@path = path | |
end | |
def exists? | |
File.exist?(path) | |
end | |
end | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Target | |
def exists? | |
raise NotImplementedError, "You must implement #{self.class}##{__method__}" | |
end | |
end | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Task | |
def input | |
@input ||= requires.map { |task| task.output } | |
end | |
def output | |
[] | |
end | |
def run | |
raise NotImplementedError, "You must implement #{self.class}##{__method__}" | |
end | |
def requires | |
[] # If you need to define task dependencies, override in subclass | |
end | |
def ready? | |
requires.all? {|t| t.completed?} | |
end | |
def completed? | |
_output.all? {|o| o.exists?} | |
end | |
def eql?(other) | |
self.hash == other.hash | |
end | |
def hash | |
self.class.name.hash | |
end | |
private | |
def _output | |
o = output | |
o = [o] unless o.is_a?(Array) | |
o | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative 'dag' | |
require 'thread' | |
class Workflow | |
def initialize(conf={}) | |
@dag = DAG.new | |
@concurrency = conf[:concurrency] || 1 | |
end | |
def run(task) | |
@dag.add_task(task) | |
parallel(@dag.tsort, @concurrency) do |t| | |
until t.ready? | |
puts "#{Time.now} #{t.class.name} is not ready" | |
sleep 1 | |
end | |
t.run unless t.completed? | |
end | |
end | |
def parallel(enumerable, concurrency) | |
q = Queue.new | |
enumerable.each {|e| q << e} | |
proc = lambda do | |
until q.empty? | |
t = q.pop | |
puts "#{Time.now} pop task: #{t.class.name}" | |
yield t | |
end | |
end | |
threads = [] | |
concurrency.times do | |
threads << Thread.start(&proc) | |
end | |
threads.each {|t| t.join} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment