Skip to content

Instantly share code, notes, and snippets.

@jancajthaml
Last active February 12, 2018 08:35
Show Gist options
  • Save jancajthaml/35fed49420117172eeba244dc660fe1c to your computer and use it in GitHub Desktop.
Save jancajthaml/35fed49420117172eeba244dc660fe1c to your computer and use it in GitHub Desktop.
Parallel support for ruby collections
# GENERAL INFORMATION
# as in any language parallelism is not free and should be used mainly for costly, non-atomic, blocking operations
# on primitive operations it is expected to be slower than native ruby implementation because of additional resources
# and allocations spent on parallelism
#
# implementation is lock-free, for this reason following design decisions were made
# - map does not replace value in place but instead return new enumerable
# - reject and select do 2 passes over collection (using compact) to guarantee collision-free bucket
# - all methods require block and do not work with &block and/or &filter passed as a argument
---
# parallel map
puts (1..10).par_map { |x|
sleep 2 if x == 4
x * 2
}
# parallel each
(1..10).par_each { |x|
puts x
}
# parallel reject
puts (1..20).par_reject { |x|
sleep 2 if (x%10 == 0)
((x % 2) == 0)
}
# parallel select
puts (1..20).par_select { |x|
sleep 2 if (x%10 == 0)
((x % 2) == 0)
}
# example of asynchronizing synchronized workflow
synchronous_workflow = lambda {
a = do_thing_a()
b = do_thing_b()
c = do_thing_c()
resolve = do_resolve(a, b, c)
log = do_log(a, b, c)
}
asynchronous_workflow = lambda {
lazy_a = lambda { do_thing_a() }
lazy_b = lambda { do_thing_b() }
lazy_c = lambda { do_thing_c() }
a, b, c = [lazy_a, lazy_b, lazy_c].par_map { |f| f.call() }
lazy_resolve = lambda { do_resolve(a, b, c) }
lazy_log = lambda { do_log(a, b, c) }
[lazy_resolve, lazy_log].par_each { |f| f.call() }
}
require 'thread'
module Enumerable
def par_each
return unless block_given?
backlog = Queue.new
each { |work| backlog << work }
(1..Enumerable.parallelism).map {
backlog << nil
Thread.new {
while tick = backlog.deq
yield tick
end
}
}.each(&:join)
end
def par_map
return unless block_given?
result = []
idx = 0
backlog = Queue.new
each { |work| backlog << work }
(1..Enumerable.parallelism).map {
backlog << nil
Thread.new {
while tick = backlog.deq
i = idx
idx += 1
result[i] = (yield tick)
end
}
}.each(&:join)
result
end
def par_reject
return unless block_given?
result = []
idx = 0
backlog = Queue.new
each { |work| backlog << work }
(1..Enumerable.parallelism).map {
backlog << nil
Thread.new {
while tick = backlog.deq
i = idx
idx += 1
next if yield tick
result[i] = tick
end
}
}.each(&:join)
result.compact
end
def par_select
return unless block_given?
result = []
idx = 0
backlog = Queue.new
each { |work| backlog << work }
(1..Enumerable.parallelism).map {
backlog << nil
Thread.new {
while tick = backlog.deq
i = idx
idx += 1
next unless yield tick
result[i] = tick
end
}
}.each(&:join)
result.compact
end
private
class << self; attr_accessor :parallelism ; end
self.parallelism = (Integer(%x(getconf _NPROCESSORS_ONLN)) rescue 1) << 3
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment