Skip to content

Instantly share code, notes, and snippets.

@sunaot
Created February 27, 2021 07:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sunaot/a987c33ad8af562fe0889ab33edce56b to your computer and use it in GitHub Desktop.
Save sunaot/a987c33ad8af562fe0889ab33edce56b to your computer and use it in GitHub Desktop.
Ractor 使って Java Stream API のようなお手軽並列処理。map や filter の単位での並列化 version.
# frozen_string_literal: true
require_relative "ractor_stream/version"
# [1,2,3,4].parallel.filter {|n| n.even?}.map {|n| n * 2}.each {|item| puts item}
class Enumerator
class RactorParallelStream
Consumer = Struct.new(:message, :processor, :args, :keyword_args)
def initialize(enumerator)
@enumerator = enumerator
@consumers = []
end
def map(&block)
@consumers << Consumer.new(:map, block)
self
end
def filter(&block)
@consumers << Consumer.new(:filter, block)
self
end
# termination
def each(&block)
enumerator = pipeline_call(@enumerator, @consumers)
enumerator.map do |item|
Ractor.new(item, &block)
end.each(&:take)
end
private
def pipeline_call(enumerator, consumers)
return enumerator if consumers.empty?
consumer, rest = consumers.first, consumers.drop(1)
e = case consumer.message
when :map
parallel_map(enumerator, consumer)
when :filter
parallel_filter(enumerator, consumer)
end
pipeline_call(e, rest)
end
def parallel_map(enumerator, consumer)
enumerator.map do |item|
Ractor.new(item, &consumer.processor)
end.map(&:take)
end
def parallel_filter(enumerator, consumer)
items = enumerator.map do |item|
ractor = Ractor.new(item, &consumer.processor)
[item, ractor]
end
items.inject([]) do |result, (item, ractor)|
if ractor.take
result << item
else
result
end
end
end
end
end
module ParallelStream
class Error < StandardError; end
def parallel_stream(&block)
Enumerator::RactorParallelStream.new(self)
end
alias_method :parallel, :parallel_stream
end
class Enumerator
include ParallelStream
end
class Array
include ParallelStream
end
def tarai(x, y, z) =
x <= y ? y : tarai(tarai(x-1, y, z),
tarai(y-1, z, x),
tarai(z-1, x, y))
require 'benchmark'
Benchmark.bm do |x|
# sequential version
x.report('seq'){ 4.times{ tarai(14, 7, 0) } }
# parallel version
x.report('par'){
4.times.map do
Ractor.new { tarai(14, 7, 0) }
end.each(&:take)
}
# stream version
x.report('str'){ 4.times.parallel.map{ tarai(14,7,0) }.each {} }
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment