task1.annotate(outputs='task1_output.txt')
task2.annotate(outputs=['task2_output.txt',
'task2_stat.txt'])
task2a.annotate(outputs='task2_output.txt')
task3.annotate(
inputs=['random.txt', {task1: 'file1.txt',
task2: 'file2.txt'}],
outputs=['task3_output.txt'],
)
# resolved_data={'file_merged.txt': ['task1_output.txt', 'task2_output.txt'],
# 'file1.txt': ['file_merged.txt']}
# resolved_data = {<tgt>: [<src1>, <src2>, <srcN>]}
# For example:
# task3.arguments = ['--input', 'file1.txt,file2.txt']
# task3.pre_launch = ['cat task*_output.txt > file_merged.txt',
# 'grep "test" > file1.txt']
task1 -> task1_output.txt ->|
task2a ->\ |-> file_merged.txt -> file1.txt -> task3 -> task3_output.txt
task2 -> task2_output.txt ->| file2.txt ->/
\--> task2_stat.txt
workflow graph in JSON format (towards provenance graph)
{
"pipeline.0000": {
"stage.0000": {
"task.0001": {
"outputs": ["task1_output.txt"]
},
"task.0002": {
"outputs": ["task2_output.txt",
"task2_stat.txt"]
},
"task.0002a": {
"outputs": ["task2_output.txt"]
}
},
"stage.0001": {
"task.0003": {
"depends_on": ["task.0001",
"task.0002"],
"inputs": ["random.txt",
"task.0001:file1.txt",
"task.0002:file2.txt"],
"outputs": ["task3_output.txt"],
}
}
}
}
annotate
->dataflow_annotation
? (open for suggestions as well)