Skip to content

Instantly share code, notes, and snippets.

@natefoo
Created December 17, 2015 20:04
Show Gist options
  • Save natefoo/149af4a771a525410b70 to your computer and use it in GitHub Desktop.
Save natefoo/149af4a771a525410b70 to your computer and use it in GitHub Desktop.
Run a Galaxy tool continuously
#!/usr/bin/env python
import logging
import sys
from time import sleep
from bioblend.galaxy import GalaxyInstance
from daemonize import Daemonize
PID = 'continuous_submission.pid'
LOG = 'continuous_submission.log'
API_KEY = 'the kind of thing an idiot would have on his luggage'
GALAXY_URL = 'https://galaxy.example.org'
HISTORY_NAME = 'continuous'
ACTIVE_JOBS = 3
TOOL = {
"inputs" : {
"inputs|library_type" : "FR",
"inputs|paired_or_single" : "paired",
"inputs|path_reinforcement_distance" : "75",
"inputs|left_input" : {
"values" : [
{
"src" : "hda",
"hid" : 2,
"name" : "reads.ALL.left.fq",
"id" : "deadbeefdeadbeef"
}
],
"batch" : "false"
},
"inputs|right_input" : {
"values" : [
{
"src" : "hda",
"hid" : 1,
"name" : "reads.ALL.right.fq",
"id" : "d34db33fd34db33f"
}
],
"batch" : "false"
},
"additional_params|use_additional" : "no",
"__job_resource|__job_resource__select" : "no",
"inputs|group_pairs_distance" : "500"
},
"tool_version" : "0.0.1",
"tool_id" : "toolshed.g2.bx.psu.edu/repos/nate/trinity_psc/trinity_psc/0.0.1"
}
logfmt = '%(asctime)s %(levelname)-5.5s %(message)s'
log = logging.getLogger('continuous')
log.setLevel(logging.DEBUG)
class GalaxyToolRunner(object):
TERMINAL_STATES = ('discarded', 'ok', 'failed_metadata', 'paused', 'error', 'empty')
NONTERMINAL_STATES = ('upload', 'running', 'setting_metadata', 'new', 'queued')
def __init__(self, url, api_key, history_name=HISTORY_NAME, tool=None):
self.gi = GalaxyInstance(url=url, key=api_key)
self.history_name = history_name
self.tool = tool or TOOL
self.__history_id = None
self.stored_history_items = {}
self.hda_to_job = {}
@property
def history_id(self):
if self.__history_id is None:
histories = self.gi.histories.get_histories(name=self.history_name)
assert histories, "History '%s' has not been created, please do it manually" % self.history_name
self.__history_id = histories[0]['id']
return self.__history_id
def history(self):
history = self.gi.histories.show_history(self.history_id)
assert not history['deleted'], "History '%s' is deleted" % self.history_id
return history
def history_items(self):
history = self.history()
state_ids = history['state_ids']
#state_counts = history['state_details']
history_items = {}
for state, hda_ids in list(state_ids.items()):
for hda_id in hda_ids:
history_items[hda_id] = state
return history_items
def active_count(self, history_items):
count = 0
counted_jobs = set()
for hda_id, state in list(history_items.items()):
if hda_id not in self.hda_to_job:
self.hda_to_job[hda_id] = self.gi.datasets.show_dataset(hda_id)['creating_job']
job_id = self.hda_to_job[hda_id]
if state in GalaxyToolRunner.NONTERMINAL_STATES and job_id not in counted_jobs:
count += 1
counted_jobs.add(job_id)
return count
def run_tool(self):
self.gi.tools.run_tool(self.history_id, self.tool['tool_id'], self.tool['inputs'])
def run_jobs(self):
ran = False
while self.active_count(self.history_items()) < ACTIVE_JOBS:
self.run_tool()
ran = True
sleep(5) # could be a race condition? pause a moment just in case
return ran
def show_changes(self):
current = self.history_items()
stored = self.stored_history_items
for current_id in current.keys():
current_state = current[current_id]
if current_id in stored:
stored_state = stored[current_id]
if current_state != stored_state:
log.info("History item '%s' changed from '%s' to '%s'", current_id, stored_state, current_state)
else:
log.info("New history item '%s' in state '%s'", current_id, current_state)
log.info('Current active job count is: %s', self.active_count(current))
self.stored_history_items = current
def continuous_submission(self):
while True:
self.show_changes()
if self.run_jobs():
self.show_changes()
sleep(30)
def main():
gtr = GalaxyToolRunner(GALAXY_URL, API_KEY)
gtr.continuous_submission()
if __name__ == '__main__':
if len(sys.argv) > 1 and (sys.argv[1].startswith('-d') or sys.argv[1].startswith('--d')):
fh = logging.FileHandler(LOG, "w")
fm = logging.Formatter(fmt=logfmt)
fh.setFormatter(fm)
fh.setLevel(logging.DEBUG)
log.addHandler(fh)
keep_fds = [fh.stream.fileno()]
daemon = Daemonize(app="continuous_submission", pid=PID, action=main, keep_fds=keep_fds)
daemon.start()
else:
logging.basicConfig(format=logfmt)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment