Skip to content

Instantly share code, notes, and snippets.

@mtitov
Last active January 27, 2024 02:22
Show Gist options
  • Save mtitov/7b04f84a21ddd0557c60ad48ec16a288 to your computer and use it in GitHub Desktop.
Save mtitov/7b04f84a21ddd0557c60ad48ec16a288 to your computer and use it in GitHub Desktop.
task1.annotate(outputs='task1_output.txt')
task2.annotate(outputs=['task2_output.txt', 
                        'task2_stat.txt'])
task2a.annotate(outputs='task2_output.txt')

task3.annotate(
    inputs=['random.txt', {task1: 'file1.txt', 
                           task2: 'file2.txt'}],
    outputs=['task3_output.txt'],

)
#    resolved_data={'file_merged.txt': ['task1_output.txt', 'task2_output.txt'],
#                   'file1.txt': ['file_merged.txt']}

# resolved_data = {<tgt>: [<src1>, <src2>, <srcN>]}
# For example:
#   task3.arguments  = ['--input', 'file1.txt,file2.txt']
#   task3.pre_launch = ['cat task*_output.txt > file_merged.txt',
#                       'grep "test" > file1.txt']
task1  -> task1_output.txt ->|
task2a ->\                   |-> file_merged.txt -> file1.txt -> task3 -> task3_output.txt
task2  -> task2_output.txt ->|                      file2.txt ->/
     \--> task2_stat.txt

workflow graph in JSON format (towards provenance graph)

{
    "pipeline.0000": {
        "stage.0000": {
            "task.0001": {
                "outputs": ["task1_output.txt"]
            },
            "task.0002": {
                "outputs": ["task2_output.txt", 
                            "task2_stat.txt"]
            },
            "task.0002a": {
                "outputs": ["task2_output.txt"]
            }
        },
        "stage.0001": {
            "task.0003": {
                "depends_on": ["task.0001", 
                               "task.0002"],
                "inputs": ["random.txt",
                           "task.0001:file1.txt", 
                           "task.0002:file2.txt"],
                "outputs": ["task3_output.txt"],
            }
        }
    }
}
@mtitov
Copy link
Author

mtitov commented Jun 20, 2023

annotate -> dataflow_annotation? (open for suggestions as well)

@mtitov
Copy link
Author

mtitov commented Jul 26, 2023

Related PR: radical-cybertools/radical.entk#651
Usage:

import radical.entk as re

t1 = re.Task()
t1.annotate(input=['file_1.txt', 'file_2.txt'],
            output=['file_t1_1.txt', 'file_t1_2.txt'])

t2 = re.Task()
t2.annotate(input=['file_3.txt', {t1: ['file_t1_1.txt', 'file_t1_2.txt']}])

After method annotate is called, a corresponding Task instance will have set attribute annotations as following

{
    'input'     : ['<file_name>', '<task_uid>:<file_name>'],
    'output'    : ['<file_name>'],
    'depends_on': ['<task_uid>']
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment