Skip to content

Instantly share code, notes, and snippets.

@ruliana
Created May 25, 2015 02:01
Show Gist options
  • Save ruliana/158c9b7cea914a222746 to your computer and use it in GitHub Desktop.
Save ruliana/158c9b7cea914a222746 to your computer and use it in GitHub Desktop.
Testing ZeroMQ (PUSH PULL) - clean shutdown - sender
require 'rbczmq'
class WorkerCommander
def initialize(sender_port = 5555, commander_port = 5556)
@ctx = ZMQ::Context.new
@sender = @ctx.socket(:PUSH)
@sender.bind("tcp://*:#{sender_port}")
@commander = @ctx.socket(:PULL)
@commander.bind("tcp://*:#{commander_port}")
@workers = []
if block_given?
begin
yield self
ensure
wait_workers_finish
end
end
end
def wait_workers_start(how_many = 1)
process(@commander.recv) while @workers.size < how_many
end
def send(msg)
@sender.send(msg)
process(@commander.recv_nonblock)
end
def wait_workers_finish
until @workers.empty?
@sender.send('EOF')
process(@commander.recv)
end
ensure
@sender.close
@commander.close
@ctx.destroy
end
private
def process(msg)
puts msg if msg
case msg
when /^START\s(.*)$/
@workers << $1
when /^FINISH\s(.*)$/
@workers.delete($1)
end
end
end
message = 'x' * 5_000
WorkerCommander.new do |c|
c.wait_workers_start(2)
5_000_000.times do |n|
c.send(message)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment