Skip to content

Instantly share code, notes, and snippets.

@havenwood
Last active April 24, 2022 20:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save havenwood/6ac4d8c32f8af0364c27ffa26241db67 to your computer and use it in GitHub Desktop.
Save havenwood/6ac4d8c32f8af0364c27ffa26241db67 to your computer and use it in GitHub Desktop.
Implementing #each with Ractor- and Async-based concurrency for fun
require 'async'
module Enumerable
module Async
[Array, Hash, Range, Struct].each do |collection|
refine collection do
def each(&block)
super do |value|
Async do
block.call(value)
end
end
self
end
end
end
end
module Parallel
[Array, Hash, Range, Struct].each do |collection|
refine collection do
def each(&block)
pipe = Ractor.new do
loop do
Ractor.yield Ractor.receive
end
end
pool_size = size&.clamp(..16) || 16
workers = (1..pool_size).map do
Ractor.new pipe, Ractor.make_shareable(block) do |pipe, shareable_block|
while value = pipe.take
Ractor.yield shareable_block.call(value)
end
end
end
super do |value|
pipe << value
end
end
end
end
end
end
# A hack until Procs are shareable, see: https://bugs.ruby-lang.org/issues/18243
module Kernel def Parallel(&) = nil.instance_eval(&) end
require_relative 'enumerable'
using Enumerable::Async
Async do
(1..3).each do |number|
sleep 3
puts "#{number}\n"
end
end
# Three seconds later...
#>> 2
#>> 3
#>> 1
#=> 1..3
require_relative 'enumerable'
using Enumerable::Parallel
Parallel do
(1..3).each do |number|
sleep 3
puts "#{number}\n"
end
end
#=> 1..3
# Three seconds later...
#>> 1
#>> 3
#>> 2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment