Created
October 11, 2014 21:25
-
-
Save oleweidner/5970aba8d1652b5fa479 to your computer and use it in GitHub Desktop.
rp_script_to_reproduce_broken_pipe.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import radical.pilot as rp | |
# READ: The RADICAL-Pilot documentation: | |
# http://radicalpilot.readthedocs.org/en/latest | |
# | |
# Try running this example with RADICAL_PILOT_VERBOSE=debug set if | |
# you want to see what happens behind the scenes! | |
#------------------------------------------------------------------------------ | |
# | |
def pilot_state_cb (pilot, state) : | |
""" this callback is invoked on all pilot state changes """ | |
print "[Callback]: ComputePilot '%s' state: %s." % (pilot.uid, state) | |
if state == rp.FAILED: | |
sys.exit (1) | |
#------------------------------------------------------------------------------ | |
# | |
def unit_state_cb (unit, state) : | |
""" this callback is invoked on all unit state changes """ | |
print "[Callback]: ComputeUnit '%s' state: %s." % (unit.uid, state) | |
if state in [rp.FAILED] : | |
print "stdout: %s" % unit.stdout | |
print "stderr: %s" % unit.stderr | |
#------------------------------------------------------------------------------ | |
# | |
if __name__ == "__main__": | |
# Create a new session. A session is the 'root' object for all other | |
# RADICAL-Pilot objects. It encapsulates the MongoDB connection(s) as | |
# well as security credentials. | |
session = rp.Session() | |
# Add an ssh identity to the session. | |
c = rp.Context('ssh') | |
#c.user_id = "alice" | |
#c.user_pass = "ILoveBob!" | |
session.add_context(c) | |
# Add a Pilot Manager. Pilot managers manage one or more ComputePilots. | |
pmgr = rp.PilotManager(session=session) | |
# Register our callback with the PilotManager. This callback will get | |
# called every time any of the pilots managed by the PilotManager | |
# change their state. | |
pmgr.register_callback(pilot_state_cb) | |
pdesc = rp.ComputePilotDescription() | |
pdesc.resource = "archer.ac.uk" | |
pdesc.runtime = 30 # minutes | |
pdesc.cores = 24 | |
pdesc.cleanup = True | |
pdesc.queue = "standard" | |
pdesc.project = "e290" | |
# Launch the pilot. | |
pilots = pmgr.submit_pilots(pdesc) | |
# Combine the ComputePilot, the ComputeUnits and a scheduler via | |
# a UnitManager object. | |
umgr = rp.UnitManager( | |
session=session, | |
scheduler=rp.SCHED_DIRECT_SUBMISSION) | |
# Register our callback with the UnitManager. This callback will get | |
# called every time any of the units managed by the UnitManager | |
# change their state. | |
umgr.register_callback(unit_state_cb) | |
# Add the previsouly created ComputePilot to the UnitManager. | |
umgr.add_pilots(pilots) | |
# Create a workload of 8 ComputeUnits (tasks). Each compute unit | |
# uses /bin/cat to concatenate two input files, file1.dat and | |
# file2.dat. The output is written to STDOUT. cu.environment is | |
# used to demonstrate how to set environment variables within a | |
# ComputeUnit - it's not strictly necessary for this example. As | |
# a shell script, the ComputeUnits would look something like this: | |
# | |
# export INPUT1=file1.dat | |
# export INPUT2=file2.dat | |
# /bin/cat $INPUT1 $INPUT2 | |
# | |
cuds = list() | |
for unit_count in range(0, 1024): | |
cud = rp.ComputeUnitDescription() | |
cud.executable = "/bin/bash" | |
cud.environment = {'INPUT1': 'file1.dat', 'INPUT2': 'file2.dat'} | |
cud.arguments = ["-l", "-c", "cat $INPUT1 $INPUT2"] | |
cud.cores = 1 | |
cud.input_staging = ['md_0_3.ncdf', 'md_0_3.ncdf'] | |
cud.output_staging = ['md_0_3.ncdf', 'md_0_3.ncdf'] | |
cuds.append(cud) | |
# Submit the previously created ComputeUnit descriptions to the | |
# PilotManager. This will trigger the selected scheduler to start | |
# assigning ComputeUnits to the ComputePilots. | |
units = umgr.submit_units(cuds) | |
# Wait for all compute units to reach a terminal state (DONE or FAILED). | |
umgr.wait_units() | |
for unit in units: | |
print "* Unit %s (executed @ %s) state: %s, exit code: %s, started: %s, finished: %s, output: %s" \ | |
% (unit.uid, unit.execution_locations, unit.state, unit.exit_code, unit.start_time, unit.stop_time, | |
unit.stdout) | |
# Close automatically cancels the pilot(s). | |
session.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment