Skip to content

Instantly share code, notes, and snippets.

@ryantanaka
Last active June 30, 2020 16:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryantanaka/5c02d649e8dfa5f0068c20601233ddcf to your computer and use it in GitHub Desktop.
Save ryantanaka/5c02d649e8dfa5f0068c20601233ddcf to your computer and use it in GitHub Desktop.

Pegasus 5.0 Notes

XML -> YAML

  • workflow
  • site catalog
  • replica catalog (text)
  • transformation catalog (text)
  • braindump (text)

Python API

API Docs: https://pegasus.isi.edu/docs/5.0.0dev/python.html#python-api

Catalogs and Properties File Can Be Programmatically Generated

Site Catalog

sc = SiteCatalog()

shared_scratch_dir = str(WORK_DIR / RUN_ID)
local_storage_dir = str(WORK_DIR / "outputs" / RUN_ID)

local = Site("local")\
                .add_directories(
                    Directory(Directory.SHARED_SCRATCH, shared_scratch_dir)
                        .add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)),
                    
                    Directory(Directory.LOCAL_STORAGE, local_storage_dir)
                        .add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL))
                )

condorpool = Site("condorpool")\
                .add_pegasus_profile(style="condor")\
                .add_pegasus_profile(auxillary_local="true")\
                .add_condor_profile(universe="vanilla")

sc.add_sites(local, condorpool)

# pegasus-planner will, by default, pick up this file in cwd
sc.write() # written to ./sites.yml

Replica Catalog

fa = File("f.a").add_metadata(creator="ryan")
rc = ReplicaCatalog().add_replica("local", fa, Path(".") / "f.a")
rc.write() # written to ./replicas.yml

Transformation Catalog

preprocess = Transformation(
                "preprocess",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

findrange = Transformation(
                "findrange",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

analyze = Transformation(
                "analyze",
                site="condorpool",
                pfn="/usr/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

tc = TransformationCatalog().add_transformations(preprocess, findrange, analyze)
tc.write() # written to ./transformations.yml

Properties

props = Properties()
props["pegasus.data.configuration"] = "condorio"
props["pegasus.monitord.encoding"] = "json"                                                                    
props["pegasus.catalog.workflow.amqp.url"] = "amqp://friend:donatedata@msgs.pegasus.isi.edu:5672/prod/workflows"
props.write() # written to ./pegasus.properties 

Generating Worklfows

The following is an example of our black diamond workflow using the old python api:

#!/usr/bin/env python
import sys
import os
from Pegasus.DAX3 import *

if len(sys.argv) != 2:
        print "Usage: %s PEGASUS_BIN" % (sys.argv[0])
        sys.exit(1)


# Create a DAX
diamond = ADAG("diamond")
 
# Add some metadata
diamond.metadata("name", "diamond")
diamond.metadata("createdby", "Karan Vahi")
 

a = File("f.a")

 
# Add executables to the DAX-level replica catalog
keg = PFN("file://" + sys.argv[1] + "/pegasus-keg", "local")
e_preprocess = Executable(namespace="diamond", name="preprocess", version="4.0", os="linux", arch="x86_64", installed=False)
e_preprocess.metadata("size", "2048")
e_preprocess.metadata("transformation", "preprocess")
e_preprocess.addPFN(keg)
diamond.addExecutable(e_preprocess)
 
e_findrange = Executable(namespace="diamond", name="findrange", version="4.0", os="linux", arch="x86_64", installed=False)
e_findrange.metadata("size", "2048")
e_findrange.metadata("transformation", "findrange")
e_findrange.addPFN(keg)
diamond.addExecutable(e_findrange)
 
e_analyze = Executable(namespace="diamond", name="analyze", version="4.0", os="linux", arch="x86_64", installed=False)
e_analyze.metadata("size", "2048")
e_analyze.metadata("transformation", "analyze")
e_analyze.addPFN(keg)
diamond.addExecutable(e_analyze)
 
# Add a preprocess job
preprocess = Job(e_preprocess)
preprocess.metadata("time", "60")
b1 = File("f.b1")
b2 = File("f.b2")
preprocess.addArguments("-a preprocess","-T60","-i",a,"-o",b1, "-o",b2)
preprocess.uses(a, link=Link.INPUT)
preprocess.uses(b1, link=Link.OUTPUT, transfer=True, register=True)
preprocess.uses(b2, link=Link.OUTPUT, transfer=True, register=True)
diamond.addJob(preprocess)
 
# Add left Findrange job
frl = Job(e_findrange)
frl.metadata("time", "60")
c1 = File("f.c1")
frl.addArguments("-a findrange","-T60","-i",b1,"-o",c1)
frl.uses(b1, link=Link.INPUT)
frl.uses(c1, link=Link.OUTPUT, transfer=True, register=True)
diamond.addJob(frl)
 
# Add right Findrange job
frr = Job(e_findrange)
frr.metadata("time", "60")
c2 = File("f.c2")
frr.addArguments("-a findrange","-T60","-i",b2,"-o",c2)
frr.uses(b2, link=Link.INPUT)
frr.uses(c2, link=Link.OUTPUT, transfer=True, register=True)
diamond.addJob(frr)
 
# Add Analyze job
analyze = Job(e_analyze)
analyze.metadata("time", "60")
d = File("f.d")
d.metadata("final_output", "true")
analyze.addArguments("-a analyze","-T60","-i",c1,c2,"-o",d)
analyze.uses(c1, link=Link.INPUT)
analyze.uses(c2, link=Link.INPUT)
analyze.uses(d, link=Link.OUTPUT, transfer=True, register=True)
diamond.addJob(analyze)
 
# Add dependencies
diamond.depends(parent=preprocess, child=frl)
diamond.depends(parent=preprocess, child=frr)
diamond.depends(parent=frl, child=analyze)
diamond.depends(parent=frr, child=analyze)
 
# Write the DAX to stdout
import sys
diamond.writeXML(sys.stdout)
 
# Write the DAX to a file
f = open("diamond.dax","w")
diamond.writeXML(f)
f.close()

The following is an example of our black diamond workflow using the new python api:

import logging

from pathlib import Path

from Pegasus.api import *

logging.basicConfig(level=logging.DEBUG)

# --- Properties ---------------------------------------------------------------
props = Properties()
props["pegasus.data.configuration"] = "condorio"
props["pegasus.monitord.encoding"] = "json"                                                                    
props["pegasus.catalog.workflow.amqp.url"] = "amqp://friend:donatedata@msgs.pegasus.isi.edu:5672/prod/workflows"
props.write() # written to ./pegasus.properties 

# --- Replicas -----------------------------------------------------------------
with open("f.a", "w") as f:
    f.write("This is sample input to KEG")

fa = File("f.a").add_metadata(creator="ryan")
rc = ReplicaCatalog()\
    .add_replica("local", fa, Path(".") / "f.a")\
    .write() # written to ./replicas.yml 


# --- Transformations ----------------------------------------------------------
preprocess = Transformation(
                "preprocess",
                site="condorpool",
                pfn="/home/scitech/pegasus/dist/pegasus-5.0.0dev/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

findrange = Transformation(
                "findrange",
                site="condorpool",
                pfn="/home/scitech/pegasus/dist/pegasus-5.0.0dev/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

analyze = Transformation(
                "analyze",
                site="condorpool",
                pfn="/home/scitech/pegasus/dist/pegasus-5.0.0dev/bin/pegasus-keg",
                is_stageable=False,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )

tc = TransformationCatalog()\
    .add_transformations(preprocess, findrange, analyze)\
    .write() # ./written to ./transformations.yml

# --- Workflow -----------------------------------------------------------------
'''
                     [f.b1] - (findrange) - [f.c1] 
                    /                             \
[f.a] - (preprocess)                               (analyze) - [f.d]
                    \                             /
                     [f.b2] - (findrange) - [f.c2]

'''
wf = Workflow("blackdiamond")\
        .add_shell_hook(EventType.START, "echo 'hello'")

fb1 = File("f.b1")
fb2 = File("f.b2")
job_preprocess = Job(preprocess)\
                    .add_args("-a", "preprocess", "-T", "3", "-i", fa, "-o", fb1, fb2)\
                    .add_inputs(fa)\
                    .add_outputs(fb1, fb2)

fc1 = File("f.c1")
job_findrange_1 = Job(findrange)\
                    .add_args("-a", "findrange", "-T", "3", "-i", fb1, "-o", fc1)\
                    .add_inputs(fb1)\
                    .add_outputs(fc1)

fc2 = File("f.c2")
job_findrange_2 = Job(findrange)\
                    .add_args("-a", "findrange", "-T", "3", "-i", fb2, "-o", fc2)\
                    .add_inputs(fb2)\
                    .add_outputs(fc2)

fd = File("f.d")
job_analyze = Job(analyze)\
                .add_args("-a", "analyze", "-T", "3", "-i", fc1, fc2, "-o", fd)\
                .add_inputs(fc1, fc2)\
                .add_outputs(fd)

wf.add_jobs(job_preprocess, job_findrange_1, job_findrange_2, job_analyze)

try:
    wf.plan(submit=True)\
        .wait()\
        .analyze()\
        .statistics()
except PegasusClientError as e:
    print(e.output)

Note: adding profiles can be done as follows:

# Example 1
job.add_profiles(Namespace.DAGMAN, key="pre.arguments", value="-i f1")

# Example 2
job.add_profiles(Namespace.ENV, JAVA_HOME="/usr/bin/java", USER="ryan")

Converting Old Catalogs

Site Catalog

pegasus-sc-converter  -i sites.xml -o sites.yml  -O YAML -vvvvv

Transformation Catalog

pegasus-tc-converter  -i tc.text -I File -o tc.yml -v

Testing Out 5.0

https://github.com/ryantanaka/pegasus5.0-dev-env

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment