Skip to content

Instantly share code, notes, and snippets.

@mark-burnett
Created November 8, 2012 21:35
Show Gist options
  • Save mark-burnett/4041818 to your computer and use it in GitHub Desktop.
Save mark-burnett/4041818 to your computer and use it in GitHub Desktop.
Ruote iterator timings vs # workers
#!/usr/bin/env ruby
require 'rubygems'
require 'bundler/setup'
require 'json'
require 'ruote'
require 'amqp'
require 'ruote-amqp'
require 'redis'
require 'ruote-redis'
require 'ruote/util/process_observer'
class AmqpExitReceiver < Ruote::Amqp::Receiver
def handle(header, payload)
exit(0)
end
end
AMQP.settings[:user] = 'guest'
AMQP.settings[:pass] = 'guest'
AMQP.settings[:host] = 'blade7-2-2'
AMQP.settings[:vhost] = '/development/workflow'
Thread.new { AMQP.start }
sleep 0.1
storage = Ruote::Redis::Storage.new(
::Redis.new(:host => 'blade7-2-2',
:db => 7, :thread_safe => true), {})
dashboard = Ruote::Dashboard.new(storage)
channel = AMQP::Channel.new
channel.prefetch(1)
exchange = channel.topic("ruote.benchmark.kill", :auto_delte => true)
amqp_exit_receiver = AmqpExitReceiver.new(dashboard,
channel.queue('', :exclusive => true).bind(exchange))
pdef = Ruote.define $$ do
# iterator :times => '${v:count}' do
citerator :times => '${v:count}' do
nothing
# amqp_echo
end
end
print "launching: "
puts ARGV[0]
wfid = dashboard.launch(pdef, {}, {'count' => ARGV[0]})
dashboard.wait_for(wfid, {:timeout => -1})
#!/usr/bin/env ruby
require 'rubygems'
require 'bundler/setup'
require 'json'
require 'amqp'
require 'ruote'
require 'ruote-amqp'
require 'redis'
require 'ruote-redis'
require 'ruote/util/process_observer'
class AmqpEchoReceiver < Ruote::Amqp::Receiver
def handle(header, payload)
workitem = JSON.parse(payload)
receive(workitem)
end
end
class AmqpExitReceiver < Ruote::Amqp::Receiver
def handle(header, payload)
exit(0)
end
end
AMQP.settings[:user] = 'guest'
AMQP.settings[:pass] = 'guest'
AMQP.settings[:host] = 'blade7-2-2'
AMQP.settings[:vhost] = '/development/workflow'
Thread.new { AMQP.start }
sleep 0.1
storage = Ruote::Redis::Storage.new(
::Redis.new(:host => 'blade7-2-2',
:db => 7, :thread_safe => true), {})
dashboard = Ruote::Dashboard.new(Ruote::Worker.new(storage))
dashboard.register_participant 'nothing', Ruote::NoOpParticipant
dashboard.register_participant 'amqp_echo', Ruote::Amqp::Participant, :exchange => ['direct', ''], :routing_key => 'echo'
channel = AMQP::Channel.new
channel.prefetch(1)
amqp_echo_receiver = AmqpEchoReceiver.new(dashboard, channel.queue('echo'))
exchange = channel.topic("ruote.benchmark.kill", :auto_delte => true)
dashboard.register_participant 'send_exit_message', Ruote::Amqp::Participant, :exchange => [:topic, 'ruote.benchmark.kill'], :routing_key => ''
amqp_exit_receiver = AmqpExitReceiver.new(dashboard,
channel.queue('', :exclusive => true).bind(exchange))
dashboard.wait_for('terminated', {:timeout => -1})
exit_proc = Ruote.define $$ do
send_exit_message
end
ep_wfid = dashboard.launch(exit_proc)
dashboard.wait_for(ep_wfid)
@mark-burnett
Copy link
Author

num_operations num_workers wallclock_time worker_max_resident_kb

1000 1 89.26 308816
1000 2 109.28 657744
1000 3 114.59 992656
1000 4 114.42 1395600

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment