Skip to content

Instantly share code, notes, and snippets.

@k-tsj
Created September 19, 2020 15:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save k-tsj/afa6595ecdcaa5daed870ed23b4ca525 to your computer and use it in GitHub Desktop.
Save k-tsj/afa6595ecdcaa5daed870ed23b4ca525 to your computer and use it in GitHub Desktop.
Enumerator::Parallel
END {
8.times.parallel(n: 2).each { p _1 }
}
class Enumerator
class Parallel
def initialize(enum, n:)
@enum = enum
@n = n
end
def each(&blk)
pipe = Ractor.new do
loop do
Ractor.yield(Ractor.recv)
end
end
rs = @n.times.map do |i|
Ractor.new(pipe, blk.object_id) do |pipe, blk_id|
b = ObjectSpace.each_object {|i| break i if i.object_id == blk_id }
while l = pipe.take
b.call(l)
end
rescue Ractor::ClosedError
end
end
@enum.each do |i|
pipe.send(i)
end
pipe.close
until rs.empty?
r, = Ractor.select(*rs)
rs.delete r
end
end
end
def parallel(n:)
Enumerator::Parallel.new(self, n: n)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment