Skip to content

Instantly share code, notes, and snippets.

@natefoo natefoo/job_conf.xml
Last active Aug 29, 2015

Embed
What would you like to do?
A fake job runner for testing the walltime resubmission feature committed in https://bitbucket.org/galaxy/galaxy-central/commits/7b209e06ddb944e953d340754439f4e3e5dc339d
<?xml version="1.0"?>
<job_conf>
<plugins>
<plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner" workers="8"/>
<plugin id="walltime" type="runner" load="galaxy.jobs.runners.walltime:WalltimeJobRunner" workers="4"/>
</plugins>
<handlers default="main">
<handler id="main"/>
</handlers>
<destinations default="short">
<destination id="short" runner="walltime">
<resubmit condition="walltime_reached" destination="long" />
<resubmit condition="memory_limit_reached" destination="long" />
</destination>
<destination id="long" runner="local"/>
</destinations>
<tools>
<tool id="upload1" destination="long"/>
</tools>
</job_conf>
import time
import random
import logging
from galaxy import model
from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner
log = logging.getLogger( __name__ )
__all__ = [ 'WalltimeJobRunner' ]
class WalltimeJobRunner( AsynchronousJobRunner ):
runner_name = "WalltimeRunner"
def __init__( self, app, nworkers, **kwargs ):
"""Start the job runner"""
self.step_delay = int(kwargs.get( 'step_delay', 1 ))
super( WalltimeJobRunner, self ).__init__( app, nworkers, **kwargs )
self._init_monitor_thread()
self._init_worker_threads()
self.current_job_id = 0
def queue_job( self, job_wrapper ):
ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_name='walltime_job' )
ajs.job_id = self.current_job_id
ajs.old_state = 'new'
ajs.job_destination = job_wrapper.job_destination
self.current_job_id += 1
log.debug( '(%s) Job queued with external id %s', job_wrapper.get_id_tag(), ajs.job_id )
self.monitor_queue.put( ajs )
def check_watched_items( self ):
new_watched = []
time.sleep( self.step_delay )
for ajs in self.watched:
log.debug( '(%s/%s) State check, last state was: %s', ajs.job_wrapper.get_id_tag(), ajs.job_id, ajs.old_state )
if ajs.old_state == model.Job.states.NEW:
log.debug( '(%s/%s) Job state changed to queued', ajs.job_wrapper.get_id_tag(), ajs.job_id )
ajs.job_wrapper.change_state( model.Job.states.QUEUED )
ajs.old_state = model.Job.states.QUEUED
new_watched.append( ajs )
elif ajs.old_state == model.Job.states.QUEUED:
log.debug( '(%s/%s) Job state changed to running', ajs.job_wrapper.get_id_tag(), ajs.job_id )
ajs.running = True
ajs.job_wrapper.change_state( model.Job.states.RUNNING )
ajs.old_state = model.Job.states.RUNNING
new_watched.append( ajs )
elif ajs.old_state == model.Job.states.RUNNING:
log.debug( '(%s/%s) Job state changed to failed', ajs.job_wrapper.get_id_tag(), ajs.job_id )
ajs.stop_job = False
if random.randint(0,1):
ajs.fail_message = "Walltime reached"
ajs.runner_state = ajs.runner_states.WALLTIME_REACHED
else:
ajs.fail_message = "Memory exhausted"
ajs.runner_state = ajs.runner_states.MEMORY_LIMIT_REACHED
self.work_queue.put( ( self.fail_job, ajs ) )
self.watched = new_watched
@natefoo

This comment has been minimized.

Copy link
Owner Author

natefoo commented Mar 25, 2015

Update to also test the MEMORY_LIMIT_REACHED state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.