Skip to content

Instantly share code, notes, and snippets.

Created August 3, 2012 06:44
Show Gist options
  • Save anonymous/3245164 to your computer and use it in GitHub Desktop.
Save anonymous/3245164 to your computer and use it in GitHub Desktop.
require 'tmpdir'
require 'ruote'
#
# blah
#
# Note: using ruote 2.3.0 (master), not too difficult to port to 2.2.0.
#
class ConvertFilesParticipant
include Ruote::LocalParticipant
PROCESS_POLL_FREQUENCY = 2 # seconds
def initialize(options)
@options = options
end
def on_workitem
info = workitem.fields['info']
#pid = spawn(info, 'convert_file_utility.sh')
pid = spawn('sleep 5')
#File.open(pidpath, 'wb') { |f| f.puts(pid) }
put('pid', pid)
puts "job started with pid: #{pid}"
while running?(pid)
sleep(PROCESS_POLL_FREQUENCY)
end
# unless Process.wait(pid), this doesn't block the whole process
reply
end
def on_cancel
#pid = File.read(pidpath).to_i rescue nil
pid = get('pid')
puts "cancelling pid: #{pid}"
Process.kill(9, pid) if pid
end
protected
# def pidpath
#
# File.join(Dir.tmpdir, "#{fei.sid}.pid")
# end
def running?(pid)
case `ps -p #{pid} -o state`.strip.split("\n").last
when 'STAT', /^Z/ then false
else true
end
end
end
ruote = Ruote::Dashboard.new(Ruote::Worker.new(Ruote::HashStorage.new))
#ruote.noisy = true
ruote.register 'convert', ConvertFilesParticipant
wfid = ruote.launch(Ruote.define do
convert
end)
ruote.wait_for('dispatch')
sleep 1
puts 'cancelling flow...'
ruote.cancel(wfid)
ruote.wait_for('terminated')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment