Skip to content

Instantly share code, notes, and snippets.

Last active August 29, 2015 14:02
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
A fake job runner for testing the walltime resubmission feature committed in
<?xml version="1.0"?>
<plugin id="local" type="runner" load="" workers="8"/>
<plugin id="walltime" type="runner" load="" workers="4"/>
<handlers default="main">
<handler id="main"/>
<destinations default="short">
<destination id="short" runner="walltime">
<resubmit condition="walltime_reached" destination="long" />
<resubmit condition="memory_limit_reached" destination="long" />
<destination id="long" runner="local"/>
<tool id="upload1" destination="long"/>
import time
import random
import logging
from galaxy import model
from import AsynchronousJobState, AsynchronousJobRunner
log = logging.getLogger( __name__ )
__all__ = [ 'WalltimeJobRunner' ]
class WalltimeJobRunner( AsynchronousJobRunner ):
runner_name = "WalltimeRunner"
def __init__( self, app, nworkers, **kwargs ):
"""Start the job runner"""
self.step_delay = int(kwargs.get( 'step_delay', 1 ))
super( WalltimeJobRunner, self ).__init__( app, nworkers, **kwargs )
self.current_job_id = 0
def queue_job( self, job_wrapper ):
ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_name='walltime_job' )
ajs.job_id = self.current_job_id
ajs.old_state = 'new'
ajs.job_destination = job_wrapper.job_destination
self.current_job_id += 1
log.debug( '(%s) Job queued with external id %s', job_wrapper.get_id_tag(), ajs.job_id )
self.monitor_queue.put( ajs )
def check_watched_items( self ):
new_watched = []
time.sleep( self.step_delay )
for ajs in self.watched:
log.debug( '(%s/%s) State check, last state was: %s', ajs.job_wrapper.get_id_tag(), ajs.job_id, ajs.old_state )
if ajs.old_state == model.Job.states.NEW:
log.debug( '(%s/%s) Job state changed to queued', ajs.job_wrapper.get_id_tag(), ajs.job_id )
ajs.job_wrapper.change_state( model.Job.states.QUEUED )
ajs.old_state = model.Job.states.QUEUED
new_watched.append( ajs )
elif ajs.old_state == model.Job.states.QUEUED:
log.debug( '(%s/%s) Job state changed to running', ajs.job_wrapper.get_id_tag(), ajs.job_id )
ajs.running = True
ajs.job_wrapper.change_state( model.Job.states.RUNNING )
ajs.old_state = model.Job.states.RUNNING
new_watched.append( ajs )
elif ajs.old_state == model.Job.states.RUNNING:
log.debug( '(%s/%s) Job state changed to failed', ajs.job_wrapper.get_id_tag(), ajs.job_id )
ajs.stop_job = False
if random.randint(0,1):
ajs.fail_message = "Walltime reached"
ajs.runner_state = ajs.runner_states.WALLTIME_REACHED
ajs.fail_message = "Memory exhausted"
ajs.runner_state = ajs.runner_states.MEMORY_LIMIT_REACHED
self.work_queue.put( ( self.fail_job, ajs ) )
self.watched = new_watched
Copy link

natefoo commented Mar 25, 2015

Update to also test the MEMORY_LIMIT_REACHED state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment