maasha / forkpool2.rb secret
Last active

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist
View forkpool2.rb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
#!/usr/bin/env ruby
 
require 'pp'
 
DEBUG = true
CPUS = 4
 
module Enumerable
# Fork each (feach) creates a fork pool with a specified number of processes
# (_procs_) to iterate over the Enumerable object processing the specified
# block. Calling feach with 0 _procs_ disables forking for debugging purposes.
#
# @example - process 10 elements using 4 processes:
#
# (0 ... 10).feach(4) { |i| puts i; sleep 1 }
def feach(procs, &block)
$stderr.puts "Parent pid: #{Process.pid}" if DEBUG
 
if procs > 0
workers = spawn_workers(procs, &block)
 
threads = []
 
self.each_with_index do |elem, index|
$stderr.puts "elem: #{elem} index: #{index}" if DEBUG
 
threads << Thread.new do
worker = workers[index % procs]
worker.process(elem)
end
end
 
threads.each { |thread| thread.join }
workers.each { |worker| worker.wait }
else
self.each do |elem|
block.call(elem)
end
end
end
 
def spawn_workers(procs, &block)
workers = []
 
procs.times do
child_read, parent_write = IO.pipe
parent_read, child_write = IO.pipe
 
pid = Process.fork do
begin
parent_write.close
parent_read.close
call(child_read, child_write, &block)
ensure
child_read.close
child_write.close
end
end
 
$stderr.puts "Spawning worker with pid: #{pid}" if DEBUG
 
workers << Worker.new(parent_read, parent_write, pid)
end
 
workers
end
 
def call(child_read, child_write, &block)
while not child_read.eof?
elem = Marshal.load(child_read)
$stderr.puts " call with Process.pid: #{Process.pid}" if DEBUG
result = block.call(elem)
Marshal.dump(result, child_write)
end
end
 
class Worker
attr_reader :parent_read, :parent_write, :pid
 
def initialize(parent_read, parent_write, pid)
@parent_read = parent_read
@parent_write = parent_write
@pid = pid
end
 
def process(elem)
Marshal.dump(elem, @parent_write)
$stderr.puts " process with worker pid: #{@pid} and parent pid: #{Process.pid}" if DEBUG
Marshal.load(@parent_read)
end
 
def wait
$stderr.puts "Waiting for worker with pid: #{@pid}" if DEBUG
Process.wait(@pid, Process::WNOHANG)
end
end
end
 
(0 ... 10).feach(CPUS) { |i| sleep 1; puts i }
View forkpool2.rb
1 2 3 4 5 6 7 8 9 10 11 12
# with DEBUG = false
 
0
2
1
3
8
5
6
7
4
9
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.