- workflow
- site catalog
- replica catalog (text)
- transformation catalog (text)
- braindump (text)
API Docs: https://pegasus.isi.edu/docs/5.0.0dev/python.html#python-api
sc = SiteCatalog()
shared_scratch_dir = str(WORK_DIR / RUN_ID)
local_storage_dir = str(WORK_DIR / "outputs" / RUN_ID)
local = Site("local")\
.add_directories(
Directory(Directory.SHARED_SCRATCH, shared_scratch_dir)
.add_file_servers(FileServer("file://" + shared_scratch_dir, Operation.ALL)),
Directory(Directory.LOCAL_STORAGE, local_storage_dir)
.add_file_servers(FileServer("file://" + local_storage_dir, Operation.ALL))
)
condorpool = Site("condorpool")\
.add_pegasus_profile(style="condor")\
.add_pegasus_profile(auxillary_local="true")\
.add_condor_profile(universe="vanilla")
sc.add_sites(local, condorpool)
# pegasus-planner will, by default, pick up this file in cwd
sc.write() # written to ./sites.yml
fa = File("f.a").add_metadata(creator="ryan")
rc = ReplicaCatalog().add_replica("local", fa, Path(".") / "f.a")
rc.write() # written to ./replicas.yml
preprocess = Transformation(
"preprocess",
site="condorpool",
pfn="/usr/bin/pegasus-keg",
is_stageable=False,
arch=Arch.X86_64,
os_type=OS.LINUX
)
findrange = Transformation(
"findrange",
site="condorpool",
pfn="/usr/bin/pegasus-keg",
is_stageable=False,
arch=Arch.X86_64,
os_type=OS.LINUX
)
analyze = Transformation(
"analyze",
site="condorpool",
pfn="/usr/bin/pegasus-keg",
is_stageable=False,
arch=Arch.X86_64,
os_type=OS.LINUX
)
tc = TransformationCatalog().add_transformations(preprocess, findrange, analyze)
tc.write() # written to ./transformations.yml
props = Properties()
props["pegasus.data.configuration"] = "condorio"
props["pegasus.monitord.encoding"] = "json"
props["pegasus.catalog.workflow.amqp.url"] = "amqp://friend:donatedata@msgs.pegasus.isi.edu:5672/prod/workflows"
props.write() # written to ./pegasus.properties
The following is an example of our black diamond workflow using the old python api:
#!/usr/bin/env python
import sys
import os
from Pegasus.DAX3 import *
if len(sys.argv) != 2:
print "Usage: %s PEGASUS_BIN" % (sys.argv[0])
sys.exit(1)
# Create a DAX
diamond = ADAG("diamond")
# Add some metadata
diamond.metadata("name", "diamond")
diamond.metadata("createdby", "Karan Vahi")
a = File("f.a")
# Add executables to the DAX-level replica catalog
keg = PFN("file://" + sys.argv[1] + "/pegasus-keg", "local")
e_preprocess = Executable(namespace="diamond", name="preprocess", version="4.0", os="linux", arch="x86_64", installed=False)
e_preprocess.metadata("size", "2048")
e_preprocess.metadata("transformation", "preprocess")
e_preprocess.addPFN(keg)
diamond.addExecutable(e_preprocess)
e_findrange = Executable(namespace="diamond", name="findrange", version="4.0", os="linux", arch="x86_64", installed=False)
e_findrange.metadata("size", "2048")
e_findrange.metadata("transformation", "findrange")
e_findrange.addPFN(keg)
diamond.addExecutable(e_findrange)
e_analyze = Executable(namespace="diamond", name="analyze", version="4.0", os="linux", arch="x86_64", installed=False)
e_analyze.metadata("size", "2048")
e_analyze.metadata("transformation", "analyze")
e_analyze.addPFN(keg)
diamond.addExecutable(e_analyze)
# Add a preprocess job
preprocess = Job(e_preprocess)
preprocess.metadata("time", "60")
b1 = File("f.b1")
b2 = File("f.b2")
preprocess.addArguments("-a preprocess","-T60","-i",a,"-o",b1, "-o",b2)
preprocess.uses(a, link=Link.INPUT)
preprocess.uses(b1, link=Link.OUTPUT, transfer=True, register=True)
preprocess.uses(b2, link=Link.OUTPUT, transfer=True, register=True)
diamond.addJob(preprocess)
# Add left Findrange job
frl = Job(e_findrange)
frl.metadata("time", "60")
c1 = File("f.c1")
frl.addArguments("-a findrange","-T60","-i",b1,"-o",c1)
frl.uses(b1, link=Link.INPUT)
frl.uses(c1, link=Link.OUTPUT, transfer=True, register=True)
diamond.addJob(frl)
# Add right Findrange job
frr = Job(e_findrange)
frr.metadata("time", "60")
c2 = File("f.c2")
frr.addArguments("-a findrange","-T60","-i",b2,"-o",c2)
frr.uses(b2, link=Link.INPUT)
frr.uses(c2, link=Link.OUTPUT, transfer=True, register=True)
diamond.addJob(frr)
# Add Analyze job
analyze = Job(e_analyze)
analyze.metadata("time", "60")
d = File("f.d")
d.metadata("final_output", "true")
analyze.addArguments("-a analyze","-T60","-i",c1,c2,"-o",d)
analyze.uses(c1, link=Link.INPUT)
analyze.uses(c2, link=Link.INPUT)
analyze.uses(d, link=Link.OUTPUT, transfer=True, register=True)
diamond.addJob(analyze)
# Add dependencies
diamond.depends(parent=preprocess, child=frl)
diamond.depends(parent=preprocess, child=frr)
diamond.depends(parent=frl, child=analyze)
diamond.depends(parent=frr, child=analyze)
# Write the DAX to stdout
import sys
diamond.writeXML(sys.stdout)
# Write the DAX to a file
f = open("diamond.dax","w")
diamond.writeXML(f)
f.close()
The following is an example of our black diamond workflow using the new python api:
import logging
from pathlib import Path
from Pegasus.api import *
logging.basicConfig(level=logging.DEBUG)
# --- Properties ---------------------------------------------------------------
props = Properties()
props["pegasus.data.configuration"] = "condorio"
props["pegasus.monitord.encoding"] = "json"
props["pegasus.catalog.workflow.amqp.url"] = "amqp://friend:donatedata@msgs.pegasus.isi.edu:5672/prod/workflows"
props.write() # written to ./pegasus.properties
# --- Replicas -----------------------------------------------------------------
with open("f.a", "w") as f:
f.write("This is sample input to KEG")
fa = File("f.a").add_metadata(creator="ryan")
rc = ReplicaCatalog()\
.add_replica("local", fa, Path(".") / "f.a")\
.write() # written to ./replicas.yml
# --- Transformations ----------------------------------------------------------
preprocess = Transformation(
"preprocess",
site="condorpool",
pfn="/home/scitech/pegasus/dist/pegasus-5.0.0dev/bin/pegasus-keg",
is_stageable=False,
arch=Arch.X86_64,
os_type=OS.LINUX
)
findrange = Transformation(
"findrange",
site="condorpool",
pfn="/home/scitech/pegasus/dist/pegasus-5.0.0dev/bin/pegasus-keg",
is_stageable=False,
arch=Arch.X86_64,
os_type=OS.LINUX
)
analyze = Transformation(
"analyze",
site="condorpool",
pfn="/home/scitech/pegasus/dist/pegasus-5.0.0dev/bin/pegasus-keg",
is_stageable=False,
arch=Arch.X86_64,
os_type=OS.LINUX
)
tc = TransformationCatalog()\
.add_transformations(preprocess, findrange, analyze)\
.write() # ./written to ./transformations.yml
# --- Workflow -----------------------------------------------------------------
'''
[f.b1] - (findrange) - [f.c1]
/ \
[f.a] - (preprocess) (analyze) - [f.d]
\ /
[f.b2] - (findrange) - [f.c2]
'''
wf = Workflow("blackdiamond")\
.add_shell_hook(EventType.START, "echo 'hello'")
fb1 = File("f.b1")
fb2 = File("f.b2")
job_preprocess = Job(preprocess)\
.add_args("-a", "preprocess", "-T", "3", "-i", fa, "-o", fb1, fb2)\
.add_inputs(fa)\
.add_outputs(fb1, fb2)
fc1 = File("f.c1")
job_findrange_1 = Job(findrange)\
.add_args("-a", "findrange", "-T", "3", "-i", fb1, "-o", fc1)\
.add_inputs(fb1)\
.add_outputs(fc1)
fc2 = File("f.c2")
job_findrange_2 = Job(findrange)\
.add_args("-a", "findrange", "-T", "3", "-i", fb2, "-o", fc2)\
.add_inputs(fb2)\
.add_outputs(fc2)
fd = File("f.d")
job_analyze = Job(analyze)\
.add_args("-a", "analyze", "-T", "3", "-i", fc1, fc2, "-o", fd)\
.add_inputs(fc1, fc2)\
.add_outputs(fd)
wf.add_jobs(job_preprocess, job_findrange_1, job_findrange_2, job_analyze)
try:
wf.plan(submit=True)\
.wait()\
.analyze()\
.statistics()
except PegasusClientError as e:
print(e.output)
Note: adding profiles can be done as follows:
# Example 1
job.add_profiles(Namespace.DAGMAN, key="pre.arguments", value="-i f1")
# Example 2
job.add_profiles(Namespace.ENV, JAVA_HOME="/usr/bin/java", USER="ryan")
Site Catalog
pegasus-sc-converter -i sites.xml -o sites.yml -O YAML -vvvvv
Transformation Catalog
pegasus-tc-converter -i tc.text -I File -o tc.yml -v