Skip to content

Instantly share code, notes, and snippets.

@natefoo
Last active August 29, 2015 14:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save natefoo/361414fbca3c0ea63aa5 to your computer and use it in GitHub Desktop.
Save natefoo/361414fbca3c0ea63aa5 to your computer and use it in GitHub Desktop.
A fake job runner for testing the walltime resubmission feature committed in https://bitbucket.org/galaxy/galaxy-central/commits/7b209e06ddb944e953d340754439f4e3e5dc339d
<?xml version="1.0"?>
<job_conf>
<plugins>
<plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner" workers="8"/>
<plugin id="walltime" type="runner" load="galaxy.jobs.runners.walltime:WalltimeJobRunner" workers="4"/>
</plugins>
<handlers default="main">
<handler id="main"/>
</handlers>
<destinations default="short">
<destination id="short" runner="walltime">
<resubmit condition="walltime_reached" destination="long" />
<resubmit condition="memory_limit_reached" destination="long" />
</destination>
<destination id="long" runner="local"/>
</destinations>
<tools>
<tool id="upload1" destination="long"/>
</tools>
</job_conf>
import time
import random
import logging
from galaxy import model
from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner
log = logging.getLogger( __name__ )
__all__ = [ 'WalltimeJobRunner' ]
class WalltimeJobRunner( AsynchronousJobRunner ):
runner_name = "WalltimeRunner"
def __init__( self, app, nworkers, **kwargs ):
"""Start the job runner"""
self.step_delay = int(kwargs.get( 'step_delay', 1 ))
super( WalltimeJobRunner, self ).__init__( app, nworkers, **kwargs )
self._init_monitor_thread()
self._init_worker_threads()
self.current_job_id = 0
def queue_job( self, job_wrapper ):
ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_name='walltime_job' )
ajs.job_id = self.current_job_id
ajs.old_state = 'new'
ajs.job_destination = job_wrapper.job_destination
self.current_job_id += 1
log.debug( '(%s) Job queued with external id %s', job_wrapper.get_id_tag(), ajs.job_id )
self.monitor_queue.put( ajs )
def check_watched_items( self ):
new_watched = []
time.sleep( self.step_delay )
for ajs in self.watched:
log.debug( '(%s/%s) State check, last state was: %s', ajs.job_wrapper.get_id_tag(), ajs.job_id, ajs.old_state )
if ajs.old_state == model.Job.states.NEW:
log.debug( '(%s/%s) Job state changed to queued', ajs.job_wrapper.get_id_tag(), ajs.job_id )
ajs.job_wrapper.change_state( model.Job.states.QUEUED )
ajs.old_state = model.Job.states.QUEUED
new_watched.append( ajs )
elif ajs.old_state == model.Job.states.QUEUED:
log.debug( '(%s/%s) Job state changed to running', ajs.job_wrapper.get_id_tag(), ajs.job_id )
ajs.running = True
ajs.job_wrapper.change_state( model.Job.states.RUNNING )
ajs.old_state = model.Job.states.RUNNING
new_watched.append( ajs )
elif ajs.old_state == model.Job.states.RUNNING:
log.debug( '(%s/%s) Job state changed to failed', ajs.job_wrapper.get_id_tag(), ajs.job_id )
ajs.stop_job = False
if random.randint(0,1):
ajs.fail_message = "Walltime reached"
ajs.runner_state = ajs.runner_states.WALLTIME_REACHED
else:
ajs.fail_message = "Memory exhausted"
ajs.runner_state = ajs.runner_states.MEMORY_LIMIT_REACHED
self.work_queue.put( ( self.fail_job, ajs ) )
self.watched = new_watched
@natefoo
Copy link
Author

natefoo commented Mar 25, 2015

Update to also test the MEMORY_LIMIT_REACHED state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment