Skip to content

Instantly share code, notes, and snippets.

@mark-burnett
Created November 6, 2012 23:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mark-burnett/4028287 to your computer and use it in GitHub Desktop.
Save mark-burnett/4028287 to your computer and use it in GitHub Desktop.
Launching managing grid jobs with ruote
require 'amqp'
require 'drmaa'
require 'json'
AMQP.settings[:user] = 'guest'
AMQP.settings[:pass] = 'guest'
AMQP.settings[:host] = 'localhost'
AMQP.settings[:vhost] = '/development/workflow'
$command_wrapper_path = '/home/vagrant/rw/workflow-grid-service/command_wrapper'
AMQP.start do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.direct('')
queue = channel.queue('grid_submit_job_requests', :durable => true)
drmaa_session = DRMAA::Session.new
queue.subscribe(:ack => true) do |header, payload|
workitem = JSON.parse(payload)
print "workitem: "
p workitem
params = workitem['fields']['params']
arg = params['arg']
if arg == nil then
arg = []
end
t = DRMAA::JobTemplate.new
t.name = params['command'].split('/')[-1]
t.command = $command_wrapper_path
t.arg = [workitem['fei']['wfid'],
workitem['fei']['subid'],
workitem['fei']['engine_id'],
workitem['fei']['expid'],
params['command']].concat(arg)
# t.stdout = ':/tmp/qsub.out'
# t.stderr = ':/tmp/qsub.err'
job_id = drmaa_session.run(t)
puts "Started job #{job_id}"
p workitem['fei']
response = {:grid_job_id => job_id,
:workitem => workitem}
exchange.publish JSON.generate(response), :routing_key => 'grid_submit_job_repsonses'
header.ack
end
Signal.trap("INT") do
EM.stop
end
end
require 'rubygems'
require 'yajl'
require 'json'
require 'amqp'
require 'ruote'
require 'ruote-redis'
require 'ruote-amqp'
module Workflow
class RAPG < Ruote::Amqp::Participant
def on_error
puts " ** on error"
engine.cancel_expression(workitem['fei'])
end
end
def create_receiver(cls, dash, redis, queue_name)
channel = AMQP::Channel.new
channel.prefetch(1)
cls.new(dash, channel.queue(queue_name, :durable => true),
redis) #, :forget => true)
end
module_function :create_receiver
class ReceiveError < Ruote::Amqp::RemoteError
attr_reader :fei
def initialize(fei, message)
@fei = fei
@message = message
super("for #{Ruote::FlowExpressionId.to_storage_id(fei)}")
end
def to_s
@message
end
end
class RedisReceiver < Ruote::Amqp::Receiver
def initialize(dash, queue, redis, opts={})
super(dash, queue, opts)
@redis = redis
end
def handle(header, payload)
values = JSON.parse(payload)
on_values(values)
end
def on_error
puts "receiver on error"
end
end
class GridJobStartedReceiver < RedisReceiver
def on_values(values)
print "grid job submitted:"
p values
workitem = values['workitem']
job_set_name = "running-grid-jobs-#{workitem['fei']['wfid']}"
grid_job_id = values['grid_job_id']
job_key = "grid_job_workitem_#{grid_job_id}"
@redis.sadd job_set_name, grid_job_id
@redis.set job_key, JSON.generate(workitem)
print "#{job_set_name}: "
p @redis.smembers job_set_name
puts '-' * 10
end
end
class GridJobSucceededReceiver < RedisReceiver
def on_values(values)
print "grid job succeeded: "
p values
job_set_name = "running-grid-jobs-#{values['fei']['wfid']}"
grid_job_id = values['grid_job_id']
job_key = "grid_job_workitem_#{grid_job_id}"
@redis.srem job_set_name, grid_job_id
print "#{job_set_name}: "
p @redis.smembers job_set_name
workitem = JSON.parse(@redis.get(job_key))
@redis.del job_key
print "worktiem: "
p workitem
puts '-' * 10
receive(workitem)
end
end
class GridJobFailedReceiver < RedisReceiver
def on_values(values)
puts "grid job failed:"
p values
job_set_name = "running-grid-jobs-#{values['fei']['wfid']}"
grid_job_id = values['grid_job_id']
job_key = "grid_job_workitem_#{grid_job_id}"
@redis.srem job_set_name, values['grid_job_id']
print "#{job_set_name}: "
p @redis.smembers job_set_name
workitem = JSON.parse(@redis.get(job_key))
@redis.del job_key
print "worktiem: "
p workitem
puts '-' * 10
workitem['error'] = "grid job failed: #{grid_job_id}"
flunk(workitem)
end
end
def start
AMQP.settings[:user] = 'guest'
AMQP.settings[:pass] = 'guest'
AMQP.settings[:host] = 'localhost'
AMQP.settings[:vhost] = '/development/workflow'
Thread.new { AMQP.start }
# create storage
storage = Ruote::Redis::Storage.new(
::Redis.new(:db => 1, :thread_safe => true), {})
# create workers
worker = Ruote::Worker.new(storage)
# create dash
dash = Ruote::Dashboard.new(worker)
# connect to job_status backend
grid_job_control_redis = ::Redis.new(:db => 2, :thread_safe => true)
# register participants
dash.register(:grid_submit, RAPG,
:exchange => ['direct', ''],
:routing_key => 'grid_submit_job_requests')
dash.noisy = true
# create receivers
grid_job_started_receiver = create_receiver(GridJobStartedReceiver,
dash, grid_job_control_redis, 'grid_submit_job_repsonses')
grid_job_succeeded_receiver = create_receiver(GridJobSucceededReceiver,
dash, grid_job_control_redis, 'grid_job_succeeded_notifications')
grid_job_failed_receiver = create_receiver(GridJobFailedReceiver,
dash, grid_job_control_redis, 'grid_job_failed_notifications')
# wait forever
dash.join
end
module_function :start
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment