Skip to content

Instantly share code, notes, and snippets.

@KentFujii
Last active December 12, 2021 06:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save KentFujii/96fc0d917ca5737cfae6409dab3db260 to your computer and use it in GitHub Desktop.
Save KentFujii/96fc0d917ca5737cfae6409dab3db260 to your computer and use it in GitHub Desktop.
Digdag Ruby workflow
_export:
rb:
require: tasks/my_workflow_1
+step1:
rb>: Task::MyWorkflow.step1
+step2:
rb>: Task::MyWorkflow.step2
module Task
class MyWorkflow
def step1
Digdag.env.add_subtask(MyWorkflow, :step3, arg1: 1, arg2: 2)
puts Digdag.env.subtask_config
end
def step2(my_value: "default")
puts "step2: %s" % my_value
end
def step3(arg1, arg2)
puts "step3 #{arg1}"
puts "step3 #{arg2}"
end
end
end
_export:
rb:
require: tasks/my_workflow_2
+step1:
rb>: Task::MyWorkflow.step1
module Task
class MyWorkflow
def step1
Digdag.env.add_subtask({"+step2": {"rb>": "Task::MyWorkflow.step2", "my_value": 111}})
Digdag.env.add_subtask({"+step3": {"rb>": "Task::MyWorkflow.step3", "arg1": 111, "arg2": 222}})
puts Digdag.env.subtask_config['_parallel'] = true
puts Digdag.env.subtask_config
end
def step2(my_value)
sleep 3
puts "step2: %s" % my_value
end
def step3(arg1, arg2)
sleep 3
puts "step3 #{arg1}"
puts "step3 #{arg2}"
end
end
end
_export:
rb:
require: tasks/my_workflow_3
+step1:
rb>: Task::MyWorkflow.step1
module Task
class MyWorkflow
def step1
Digdag.env.add_subtask(
{
"+step2": {
"rb>": "Task::MyWorkflow.step2",
"my_value": 111
},
"+step3": {
"rb>": "Task::MyWorkflow.step3",
"arg1": 111,
"arg2": 222
},
"_parallel": true
}
)
puts Digdag.env.subtask_config
end
def step2(my_value)
sleep 3
puts "step2: %s" % my_value
end
def step3(arg1, arg2)
sleep 3
puts "step3 #{arg1}"
puts "step3 #{arg2}"
end
end
end
_export:
rb:
require: tasks/my_workflow_4
+step1:
rb>: Task::MyWorkflow1.step1
module Task
class MyWorkflow1
def step1
Digdag.env.add_subtask(
{
"+step2": {
"rb>": "Task::MyWorkflow2.step2",
"my_value": 111
},
"+step3": {
"rb>": "Task::MyWorkflow2.step3",
"arg1": 111,
"arg2": 222
},
"_parallel": true
}
)
puts Digdag.env.subtask_config
end
end
class MyWorkflow2
def step2(my_value)
sleep 3
puts "step2: %s" % my_value
end
def step3(arg1, arg2)
sleep 3
puts "step3 #{arg1}"
puts "step3 #{arg2}"
end
end
end
_export:
rb:
require: tasks/my_workflow_5
arg1: 111
arg2: 222
+step1:
rb>: Task::MyWorkflow1.step1
module Task
class MyWorkflow1
def initialize(arg1, arg2)
@arg1 = arg1
@arg2 = arg2
end
def step1
Digdag.env.add_subtask(
{
"+step2": {
"rb>": "Task::MyWorkflow2.step2",
"my_value": 111
},
"+step3": {
"rb>": "Task::MyWorkflow2.step3",
"arg1": @arg1,
"arg2": @arg2
},
"_parallel": true
}
)
puts Digdag.env.subtask_config
end
end
class MyWorkflow2
def step2(my_value)
sleep 3
puts "step2: %s" % my_value
end
def step3(arg1, arg2)
sleep 3
puts "step3 #{arg1}"
puts "step3 #{arg2}"
end
end
end
_export:
rb:
require: tasks/my_workflow_6
arg1: 111
arg2: 222
+step1:
rb>: Task::MyWorkflow1.step1
module Task
class MyWorkflow1
def initialize(arg1, arg2)
@arg1 = arg1
@arg2 = arg2
end
def step1
Digdag.env.add_subtask(
{
"+step2": {
"rb>": "Task::MyWorkflow2.step2",
"my_value": 111,
},
"+step3": {
"rb>": "Task::MyWorkflow2.step3",
"args": {
"arg1": @arg1,
"arg2": @arg2
},
"_retry": 3
},
"_parallel": true,
"_retry": 3
}
)
puts Digdag.env.subtask_config
end end
class MyWorkflow2
def step2(my_value)
sleep 3
puts "step2: %s" % my_value
end
def step3(args)
sleep 3
puts "step3 #{args['arg1']}"
puts "step3 #{args['arg2']}"
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment