Skip to content

Instantly share code, notes, and snippets.

@rsutphin
Created November 30, 2015 18:30
Show Gist options
  • Save rsutphin/bbfc9e515fa542e1f052 to your computer and use it in GitHub Desktop.
Save rsutphin/bbfc9e515fa542e1f052 to your computer and use it in GitHub Desktop.
Parallel processing in ruby with datagrams for bidirectional message passing
require 'socket'
require 'pp'
END_COMMUNICATION = "\0"
def start_child(n)
child_socket, parent_socket = Socket.pair(:UNIX, :DGRAM, 0)
maxlen = 1000
pid = fork do
parent_socket.close
start = Time.now
loop do
msg = child_socket.recv(maxlen)
case msg
when END_COMMUNICATION
break
else
sleep(0.1 * n)
child_socket.send("%s responds to %s after %.2fs!" % [n, msg, Time.now - start], 0)
end
end
child_socket.send(END_COMMUNICATION, 0)
child_socket.close
Kernel.exit!
end
child_socket.close
Process.detach(pid)
parent_socket
end
worker_sockets = (0..5).map { |n| start_child(n) }
messages = ('s'..'z').to_a
messages.each do |letter|
worker_sockets.each { |sock| sock.send(letter, 0) }
end
worker_sockets.each { |sock| sock.send(END_COMMUNICATION, 0) }
loop do
if worker_sockets.empty?
Kernel.exit(0)
else
ready, _, _ = IO.select(worker_sockets)
source = ready.first
v = source.recv(1000)
case v
when END_COMMUNICATION
puts "#{source.inspect} done"
worker_sockets.reject! { |s| s == source }
else
puts v
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment