Created
September 12, 2014 15:38
-
-
Save vivek-bala/1a6541f6e19889255dcc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import radical.pilot as rp | |
#------------------------------------------------------------------------------ | |
# | |
def pilot_state_cb (pilot, state) : | |
""" this callback is invoked on all pilot state changes """ | |
print "[Callback]: ComputePilot '%s' state: %s." % (pilot.uid, state) | |
if state == rp.FAILED : | |
sys.exit (1) | |
#------------------------------------------------------------------------------ | |
# | |
def unit_state_change_cb (unit, state) : | |
""" this callback is invoked on all unit state changes """ | |
print "[Callback]: ComputeUnit '%s' state: %s." % (unit.uid, state) | |
if state == rp.FAILED : | |
sys.exit (1) | |
#------------------------------------------------------------------------------ | |
# | |
if __name__ == "__main__": | |
# Create a new session. A session is the 'root' object for all other | |
# RADICAL-Pilot objects. It encapsulates the MongoDB connection(s) as | |
# well as security contexts. | |
session = rp.Session(database_url='mongodb://ec2-184-72-89-141.compute-1.amazonaws.com:27017/') | |
# Add an ssh identity to the session. | |
c = rp.Context('ssh') | |
c.user_id = 'vb224' | |
session.add_context(c) | |
# Add a Pilot Manager. Pilot managers manage one or more ComputePilots. | |
pmgr = rp.PilotManager(session=session) | |
# Register our callback with the PilotManager. This callback will get | |
# called every time any of the pilots managed by the PilotManager | |
# change their state. | |
pmgr.register_callback(pilot_state_cb) | |
# Define a X-core that runs for N minutes. | |
pdesc = rp.ComputePilotDescription() | |
pdesc.resource = "archer.ac.uk" | |
pdesc.project = "e290" | |
pdesc.queue = "debug" | |
pdesc.sandbox = '/work/e290/e290/vb224/radical.pilot.sandbox/' | |
pdesc.runtime = 5 # N minutes | |
pdesc.cores = 24 # X cores | |
# Launch the pilot. | |
pilot = pmgr.submit_pilots(pdesc) | |
cud_list = [] | |
for unit_count in range(0, 1): | |
mpi_test_task = rp.ComputeUnitDescription() | |
#mpi_test_task.pre_exec = ["module load python","source /fs4/e290/e290/marksant/cuve/bin/activate","module swap PrgEnv-cray PrgEnv-gnu"] | |
mpi_test_task.executable = "/bin/bash" | |
# mpi_test_task.pre_exec = ['module load gromacs'] | |
mpi_test_task.arguments = ['-l','-c','module load gromacs && grompp && mdrun'] | |
# mpi_test_task.input_staging = ['test.sh'] | |
mpi_test_task.mpi = False | |
mpi_test_task.cores = 1 | |
cud_list.append(mpi_test_task) | |
# Combine the ComputePilot, the ComputeUnits and a scheduler via | |
# a UnitManager object. | |
umgr = rp.UnitManager( | |
session=session, | |
scheduler=rp.SCHED_DIRECT_SUBMISSION) | |
# Register our callback with the UnitManager. This callback will get | |
# called every time any of the units managed by the UnitManager | |
# change their state. | |
umgr.register_callback(unit_state_change_cb) | |
# Add the previously created ComputePilot to the UnitManager. | |
umgr.add_pilots(pilot) | |
# Submit the previously created ComputeUnit descriptions to the | |
# PilotManager. This will trigger the selected scheduler to start | |
# assigning ComputeUnits to the ComputePilots. | |
units = umgr.submit_units(cud_list) | |
# Wait for all compute units to reach a terminal state (DONE or FAILED). | |
umgr.wait_units() | |
if not isinstance(units, list): | |
units = [units] | |
for unit in units: | |
print "* Task %s - state: %s, exit code: %s, started: %s, finished: %s, stdout: %s" \ | |
% (unit.uid, unit.state, unit.exit_code, unit.start_time, unit.stop_time, unit.stdout) | |
assert (unit.state == rp.DONE) | |
session.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment