Skip to content

Instantly share code, notes, and snippets.

@Gonzih
Last active December 28, 2015 00:19
Show Gist options
  • Save Gonzih/7412732 to your computer and use it in GitHub Desktop.
Save Gonzih/7412732 to your computer and use it in GitHub Desktop.
Experimenting with parallel map in ruby using pipes and processes.
require 'yaml'
require 'enumerator'
module Reducer
def cores_count
case RbConfig::CONFIG['host_os']
when /darwin9/
`hwprefs cpu_count`.to_i
when /darwin/
((`which hwprefs` != '') ? `hwprefs thread_count` : `sysctl -n hw.ncpu`).to_i
when /linux/
`cat /proc/cpuinfo | grep processor | wc -l`.to_i
when /freebsd/
`sysctl -n hw.ncpu`.to_i
end
end
def send_to_parallel(&block)
self.map do |item|
reader, writer = IO.pipe
pid = fork do
$PROGRAM_NAME = 'ruby worker'
data = YAML.dump(block.call(item))
writer.write(data)
reader.close
writer.close
exit
end
writer.close
[reader, pid]
end
end
def read_from_parallel
self.map do |reader, pid|
Process.detach(pid)
readed = reader.read
reader.close
YAML.load(readed)
end
end
def pmap_each(&block)
self.send_to_parallel(&block).read_from_parallel
end
def partition_size
(self.size / self.cores_count.to_f).round
end
def pmap(&block)
self.each_slice(self.partition_size).pmap_each do |slice|
slice.map(&block)
end.flatten
end
end
class Array
include Reducer
end
class Enumerator
include Reducer
end
a = Array.new(9) { rand(100) }
block = lambda { |x| Math.sqrt(x) }
r = a.pmap(&block)
p a.size
p r.size
p a
p r
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment