Skip to content

Instantly share code, notes, and snippets.

@tonywok
Created February 13, 2019 13:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tonywok/8eef917bc100b3f49f94e3802a8b0895 to your computer and use it in GitHub Desktop.
Save tonywok/8eef917bc100b3f49f94e3802a8b0895 to your computer and use it in GitHub Desktop.
dag-spike
require "tsort"
require 'bundler/inline'
gemfile do
source 'https://rubygems.org'
gem 'pry'
gem 'pry-byebug'
end
class Operation
attr_reader :display_name
def to_s
"<##{self.class} #{display_name} #{done}>"
end
end
class BuildTable < Operation
attr_reader :table, :done
def initialize(table, done)
@table = table
@done = done
end
def display_name
table.name
end
def done?
done
end
def finish!
@done = true
end
end
class BuildFunction < Operation
attr_reader :function, :display_name, :done
def initialize(function, done)
@function = function
@done = done
end
def display_name
function.name
end
def done?
done
end
def finish!
@done = true
end
end
class Table
attr_reader :name
def initialize(name)
@name = name
end
def dependencies
if name == "A"
[]
elsif name == "B"
[]
elsif name == "C"
["B"]
elsif name == "D"
["C"]
else
[]
end
end
end
class Function
attr_reader :name
def initialize(name)
@name = name
end
end
class Dag
include TSort
def self.build(**kwargs)
dag = new(**kwargs)
yield dag
dag
end
attr_reader :deps, :concurrency
def initialize(concurrency: 1)
@deps = {}
@deps.default = []
@operations = {}
@concurrency = concurrency
end
def build_table!(table_name, state = false)
table = Table.new(table_name)
key = [BuildTable, table_name]
@deps[key] = table.dependencies.map { |name| [BuildTable, name] }
@operations[key] = BuildTable.new(table, state)
end
def build_function!(function_name, state = false)
key = [BuildFunction, function_name]
@deps[key] = []
@operations[key] = BuildFunction.new(Function.new(function_name), state)
end
def operations
tsort.map { |key| operation(key) }
end
def next_group
workable.first(concurrency)
end
def finish!(key)
operation(key).finish!
end
private
def workable
@workable ||= tsort.lazy.select do |key|
op = operation(key)
!op.done? && dependent_operations(op).all?(&:done?)
end.map { |key| operation(key) }
end
def dependent_operations(op)
@deps[key(op)].map { |key| operation(key) }
end
def operation(key)
@operations[key]
end
def key(operation)
[operation.class, operation.display_name]
end
def tsort_each_child(key, &b)
@deps[key].each(&b)
end
def tsort_each_node(&b)
@deps.each_key(&b)
end
end
dag = Dag.build(concurrency: 1) do |dag|
dag.build_function!("berp")
dag.build_function!("derp")
dag.build_table!("D")
dag.build_table!("C")
dag.build_table!("A")
dag.build_table!("1")
dag.build_table!("2")
dag.build_table!("B")
end
puts dag.operations
# dag = Dag.build_from_task(task)
# tasks = dag.next_group.map do |operation|
# build.tasks.pending.find_by!(kind: operation.kind, name: operation.display_name)
# end
# tasks.update_all(:status => "enqueued")
# tasks.each { |task| EtlTaskJob.perform_later(task) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment