Created
November 6, 2012 23:03
-
-
Save mark-burnett/4028287 to your computer and use it in GitHub Desktop.
Launching managing grid jobs with ruote
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'amqp' | |
require 'drmaa' | |
require 'json' | |
AMQP.settings[:user] = 'guest' | |
AMQP.settings[:pass] = 'guest' | |
AMQP.settings[:host] = 'localhost' | |
AMQP.settings[:vhost] = '/development/workflow' | |
$command_wrapper_path = '/home/vagrant/rw/workflow-grid-service/command_wrapper' | |
AMQP.start do |connection| | |
channel = AMQP::Channel.new(connection) | |
exchange = channel.direct('') | |
queue = channel.queue('grid_submit_job_requests', :durable => true) | |
drmaa_session = DRMAA::Session.new | |
queue.subscribe(:ack => true) do |header, payload| | |
workitem = JSON.parse(payload) | |
print "workitem: " | |
p workitem | |
params = workitem['fields']['params'] | |
arg = params['arg'] | |
if arg == nil then | |
arg = [] | |
end | |
t = DRMAA::JobTemplate.new | |
t.name = params['command'].split('/')[-1] | |
t.command = $command_wrapper_path | |
t.arg = [workitem['fei']['wfid'], | |
workitem['fei']['subid'], | |
workitem['fei']['engine_id'], | |
workitem['fei']['expid'], | |
params['command']].concat(arg) | |
# t.stdout = ':/tmp/qsub.out' | |
# t.stderr = ':/tmp/qsub.err' | |
job_id = drmaa_session.run(t) | |
puts "Started job #{job_id}" | |
p workitem['fei'] | |
response = {:grid_job_id => job_id, | |
:workitem => workitem} | |
exchange.publish JSON.generate(response), :routing_key => 'grid_submit_job_repsonses' | |
header.ack | |
end | |
Signal.trap("INT") do | |
EM.stop | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'yajl' | |
require 'json' | |
require 'amqp' | |
require 'ruote' | |
require 'ruote-redis' | |
require 'ruote-amqp' | |
module Workflow | |
class RAPG < Ruote::Amqp::Participant | |
def on_error | |
puts " ** on error" | |
engine.cancel_expression(workitem['fei']) | |
end | |
end | |
def create_receiver(cls, dash, redis, queue_name) | |
channel = AMQP::Channel.new | |
channel.prefetch(1) | |
cls.new(dash, channel.queue(queue_name, :durable => true), | |
redis) #, :forget => true) | |
end | |
module_function :create_receiver | |
class ReceiveError < Ruote::Amqp::RemoteError | |
attr_reader :fei | |
def initialize(fei, message) | |
@fei = fei | |
@message = message | |
super("for #{Ruote::FlowExpressionId.to_storage_id(fei)}") | |
end | |
def to_s | |
@message | |
end | |
end | |
class RedisReceiver < Ruote::Amqp::Receiver | |
def initialize(dash, queue, redis, opts={}) | |
super(dash, queue, opts) | |
@redis = redis | |
end | |
def handle(header, payload) | |
values = JSON.parse(payload) | |
on_values(values) | |
end | |
def on_error | |
puts "receiver on error" | |
end | |
end | |
class GridJobStartedReceiver < RedisReceiver | |
def on_values(values) | |
print "grid job submitted:" | |
p values | |
workitem = values['workitem'] | |
job_set_name = "running-grid-jobs-#{workitem['fei']['wfid']}" | |
grid_job_id = values['grid_job_id'] | |
job_key = "grid_job_workitem_#{grid_job_id}" | |
@redis.sadd job_set_name, grid_job_id | |
@redis.set job_key, JSON.generate(workitem) | |
print "#{job_set_name}: " | |
p @redis.smembers job_set_name | |
puts '-' * 10 | |
end | |
end | |
class GridJobSucceededReceiver < RedisReceiver | |
def on_values(values) | |
print "grid job succeeded: " | |
p values | |
job_set_name = "running-grid-jobs-#{values['fei']['wfid']}" | |
grid_job_id = values['grid_job_id'] | |
job_key = "grid_job_workitem_#{grid_job_id}" | |
@redis.srem job_set_name, grid_job_id | |
print "#{job_set_name}: " | |
p @redis.smembers job_set_name | |
workitem = JSON.parse(@redis.get(job_key)) | |
@redis.del job_key | |
print "worktiem: " | |
p workitem | |
puts '-' * 10 | |
receive(workitem) | |
end | |
end | |
class GridJobFailedReceiver < RedisReceiver | |
def on_values(values) | |
puts "grid job failed:" | |
p values | |
job_set_name = "running-grid-jobs-#{values['fei']['wfid']}" | |
grid_job_id = values['grid_job_id'] | |
job_key = "grid_job_workitem_#{grid_job_id}" | |
@redis.srem job_set_name, values['grid_job_id'] | |
print "#{job_set_name}: " | |
p @redis.smembers job_set_name | |
workitem = JSON.parse(@redis.get(job_key)) | |
@redis.del job_key | |
print "worktiem: " | |
p workitem | |
puts '-' * 10 | |
workitem['error'] = "grid job failed: #{grid_job_id}" | |
flunk(workitem) | |
end | |
end | |
def start | |
AMQP.settings[:user] = 'guest' | |
AMQP.settings[:pass] = 'guest' | |
AMQP.settings[:host] = 'localhost' | |
AMQP.settings[:vhost] = '/development/workflow' | |
Thread.new { AMQP.start } | |
# create storage | |
storage = Ruote::Redis::Storage.new( | |
::Redis.new(:db => 1, :thread_safe => true), {}) | |
# create workers | |
worker = Ruote::Worker.new(storage) | |
# create dash | |
dash = Ruote::Dashboard.new(worker) | |
# connect to job_status backend | |
grid_job_control_redis = ::Redis.new(:db => 2, :thread_safe => true) | |
# register participants | |
dash.register(:grid_submit, RAPG, | |
:exchange => ['direct', ''], | |
:routing_key => 'grid_submit_job_requests') | |
dash.noisy = true | |
# create receivers | |
grid_job_started_receiver = create_receiver(GridJobStartedReceiver, | |
dash, grid_job_control_redis, 'grid_submit_job_repsonses') | |
grid_job_succeeded_receiver = create_receiver(GridJobSucceededReceiver, | |
dash, grid_job_control_redis, 'grid_job_succeeded_notifications') | |
grid_job_failed_receiver = create_receiver(GridJobFailedReceiver, | |
dash, grid_job_control_redis, 'grid_job_failed_notifications') | |
# wait forever | |
dash.join | |
end | |
module_function :start | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment