Skip to content

Instantly share code, notes, and snippets.

@jmarrec
Created May 7, 2024 00:08
Show Gist options
  • Save jmarrec/b0234bdafbfe7425a60bb1840e093cc9 to your computer and use it in GitHub Desktop.
Save jmarrec/b0234bdafbfe7425a60bb1840e093cc9 to your computer and use it in GitHub Desktop.
parallel 1.24.0 single lib
# frozen_string_literal: true
require 'rbconfig'
module Parallel
VERSION = Version = '1.24.0' # rubocop:disable Naming/ConstantName
Stop = Object.new.freeze
class DeadWorker < StandardError
end
class Break < StandardError
attr_reader :value
def initialize(value = nil)
super()
@value = value
end
end
class Kill < Break
end
class UndumpableException < StandardError
attr_reader :backtrace
def initialize(original)
super("#{original.class}: #{original.message}")
@backtrace = original.backtrace
end
end
class ExceptionWrapper
attr_reader :exception
def initialize(exception)
# Remove the bindings stack added by the better_errors gem,
# because it cannot be marshalled
if exception.instance_variable_defined? :@__better_errors_bindings_stack
exception.send :remove_instance_variable, :@__better_errors_bindings_stack
end
@exception =
begin
Marshal.dump(exception) && exception
rescue StandardError
UndumpableException.new(exception)
end
end
end
class Worker
attr_reader :pid, :read, :write
attr_accessor :thread
def initialize(read, write, pid)
@read = read
@write = write
@pid = pid
end
def stop
close_pipes
wait # if it goes zombie, rather wait here to be able to debug
end
# might be passed to started_processes and simultaneously closed by another thread
# when running in isolation mode, so we have to check if it is closed before closing
def close_pipes
read.close unless read.closed?
write.close unless write.closed?
end
def work(data)
begin
Marshal.dump(data, write)
rescue Errno::EPIPE
raise DeadWorker
end
result = begin
Marshal.load(read)
rescue EOFError
raise DeadWorker
end
raise result.exception if result.is_a?(ExceptionWrapper)
result
end
private
def wait
Process.wait(pid)
rescue Interrupt
# process died
end
end
class JobFactory
def initialize(source, mutex)
@lambda = (source.respond_to?(:call) && source) || queue_wrapper(source)
@source = source.to_a unless @lambda # turn Range and other Enumerable-s into an Array
@mutex = mutex
@index = -1
@stopped = false
end
def next
if producer?
# - index and item stay in sync
# - do not call lambda after it has returned Stop
item, index = @mutex.synchronize do
return if @stopped
item = @lambda.call
@stopped = (item == Stop)
return if @stopped
[item, @index += 1]
end
else
index = @mutex.synchronize { @index += 1 }
return if index >= size
item = @source[index]
end
[item, index]
end
def size
if producer?
Float::INFINITY
else
@source.size
end
end
# generate item that is sent to workers
# just index is faster + less likely to blow up with unserializable errors
def pack(item, index)
producer? ? [item, index] : index
end
# unpack item that is sent to workers
def unpack(data)
producer? ? data : [@source[data], data]
end
private
def producer?
@lambda
end
def queue_wrapper(array)
array.respond_to?(:num_waiting) && array.respond_to?(:pop) && -> { array.pop(false) }
end
end
class UserInterruptHandler
INTERRUPT_SIGNAL = :SIGINT
class << self
# kill all these pids or threads if user presses Ctrl+c
def kill_on_ctrl_c(pids, options)
@to_be_killed ||= []
old_interrupt = nil
signal = options.fetch(:interrupt_signal, INTERRUPT_SIGNAL)
if @to_be_killed.empty?
old_interrupt = trap_interrupt(signal) do
warn 'Parallel execution interrupted, exiting ...'
@to_be_killed.flatten.each { |pid| kill(pid) }
end
end
@to_be_killed << pids
yield
ensure
@to_be_killed.pop # do not kill pids that could be used for new processes
restore_interrupt(old_interrupt, signal) if @to_be_killed.empty?
end
def kill(thing)
Process.kill(:KILL, thing)
rescue Errno::ESRCH
# some linux systems already automatically killed the children at this point
# so we just ignore them not being there
end
private
def trap_interrupt(signal)
old = Signal.trap signal, 'IGNORE'
Signal.trap signal do
yield
if !old || old == "DEFAULT"
raise Interrupt
else
old.call
end
end
old
end
def restore_interrupt(old, signal)
Signal.trap signal, old
end
end
end
class << self
def in_threads(options = { count: 2 })
threads = []
count, = extract_count_from_options(options)
Thread.handle_interrupt(Exception => :never) do
Thread.handle_interrupt(Exception => :immediate) do
count.times do |i|
threads << Thread.new { yield(i) }
end
threads.map(&:value)
end
ensure
threads.each(&:kill)
end
end
def in_processes(options = {}, &block)
count, options = extract_count_from_options(options)
count ||= processor_count
map(0...count, options.merge(in_processes: count), &block)
end
def each(array, options = {}, &block)
map(array, options.merge(preserve_results: false), &block)
end
def any?(*args, &block)
raise "You must provide a block when calling #any?" if block.nil?
!each(*args) { |*a| raise Kill if block.call(*a) }
end
def all?(*args, &block)
raise "You must provide a block when calling #all?" if block.nil?
!!each(*args) { |*a| raise Kill unless block.call(*a) }
end
def each_with_index(array, options = {}, &block)
each(array, options.merge(with_index: true), &block)
end
def map(source, options = {}, &block)
options = options.dup
options[:mutex] = Mutex.new
if options[:in_processes] && options[:in_threads]
raise ArgumentError, "Please specify only one of `in_processes` or `in_threads`."
elsif RUBY_PLATFORM =~ (/java/) && !(options[:in_processes])
method = :in_threads
size = options[method] || processor_count
elsif options[:in_threads]
method = :in_threads
size = options[method]
elsif options[:in_ractors]
method = :in_ractors
size = options[method]
else
method = :in_processes
if Process.respond_to?(:fork)
size = options[method] || processor_count
else
warn "Process.fork is not supported by this Ruby"
size = 0
end
end
job_factory = JobFactory.new(source, options[:mutex])
size = [job_factory.size, size].min
options[:return_results] = (options[:preserve_results] != false || !!options[:finish])
add_progress_bar!(job_factory, options)
result =
if size == 0
work_direct(job_factory, options, &block)
elsif method == :in_threads
work_in_threads(job_factory, options.merge(count: size), &block)
elsif method == :in_ractors
work_in_ractors(job_factory, options.merge(count: size), &block)
else
work_in_processes(job_factory, options.merge(count: size), &block)
end
return result.value if result.is_a?(Break)
raise result if result.is_a?(Exception)
options[:return_results] ? result : source
end
def map_with_index(array, options = {}, &block)
map(array, options.merge(with_index: true), &block)
end
def flat_map(...)
map(...).flatten(1)
end
def filter_map(...)
map(...).compact
end
# Number of physical processor cores on the current system.
def physical_processor_count
@physical_processor_count ||= begin
ppc =
case RbConfig::CONFIG["target_os"]
when /darwin[12]/
IO.popen("/usr/sbin/sysctl -n hw.physicalcpu").read.to_i
when /linux/
cores = {} # unique physical ID / core ID combinations
phy = 0
File.read("/proc/cpuinfo").scan(/^physical id.*|^core id.*/) do |ln|
if ln.start_with?("physical")
phy = ln[/\d+/]
elsif ln.start_with?("core")
cid = "#{phy}:#{ln[/\d+/]}"
cores[cid] = true unless cores[cid]
end
end
cores.count
when /mswin|mingw/
require 'win32ole'
result_set = WIN32OLE.connect("winmgmts://").ExecQuery(
"select NumberOfCores from Win32_Processor"
)
result_set.to_enum.collect(&:NumberOfCores).reduce(:+)
else
processor_count
end
# fall back to logical count if physical info is invalid
ppc > 0 ? ppc : processor_count
end
end
# Number of processors seen by the OS, used for process scheduling
def processor_count
require 'etc'
@processor_count ||= Integer(ENV['PARALLEL_PROCESSOR_COUNT'] || Etc.nprocessors)
end
def worker_number
Thread.current[:parallel_worker_number]
end
# TODO: this does not work when doing threads in forks, so should remove and yield the number instead if needed
def worker_number=(worker_num)
Thread.current[:parallel_worker_number] = worker_num
end
private
def add_progress_bar!(job_factory, options)
if (progress_options = options[:progress])
raise "Progressbar can only be used with array like items" if job_factory.size == Float::INFINITY
require 'ruby-progressbar'
if progress_options == true
progress_options = { title: "Progress" }
elsif progress_options.respond_to? :to_str
progress_options = { title: progress_options.to_str }
end
progress_options = {
total: job_factory.size,
format: '%t |%E | %B | %a'
}.merge(progress_options)
progress = ProgressBar.create(progress_options)
old_finish = options[:finish]
options[:finish] = lambda do |item, i, result|
old_finish.call(item, i, result) if old_finish
progress.increment
end
end
end
def work_direct(job_factory, options, &block)
self.worker_number = 0
results = []
exception = nil
begin
while (set = job_factory.next)
item, index = set
results << with_instrumentation(item, index, options) do
call_with_index(item, index, options, &block)
end
end
rescue StandardError
exception = $!
end
exception || results
ensure
self.worker_number = nil
end
def work_in_threads(job_factory, options, &block)
raise "interrupt_signal is no longer supported for threads" if options[:interrupt_signal]
results = []
results_mutex = Mutex.new # arrays are not thread-safe on jRuby
exception = nil
in_threads(options) do |worker_num|
self.worker_number = worker_num
# as long as there are more jobs, work on one of them
while !exception && (set = job_factory.next)
begin
item, index = set
result = with_instrumentation item, index, options do
call_with_index(item, index, options, &block)
end
results_mutex.synchronize { results[index] = result }
rescue StandardError
exception = $!
end
end
end
exception || results
end
def work_in_ractors(job_factory, options)
exception = nil
results = []
results_mutex = Mutex.new # arrays are not thread-safe on jRuby
callback = options[:ractor]
if block_given? || !callback
raise ArgumentError, "pass the code you want to execute as `ractor: [ClassName, :method_name]`"
end
# build
ractors = Array.new(options.fetch(:count)) do
Ractor.new do
loop do
got = receive
(klass, method_name), item, index = got
break if index == :break
begin
Ractor.yield [nil, klass.send(method_name, item), item, index]
rescue StandardError => e
Ractor.yield [e, nil, item, index]
end
end
end
end
# start
ractors.dup.each do |ractor|
if (set = job_factory.next)
item, index = set
instrument_start item, index, options
ractor.send [callback, item, index]
else
ractor.send([[nil, nil], nil, :break]) # stop the ractor
ractors.delete ractor
end
end
# replace with new items
while (set = job_factory.next)
item_next, index_next = set
done, (exception, result, item, index) = Ractor.select(*ractors)
if exception
ractors.delete done
break
end
instrument_finish item, index, result, options
results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) }
instrument_start item_next, index_next, options
done.send([callback, item_next, index_next])
end
# finish
ractors.each do |ractor|
(new_exception, result, item, index) = ractor.take
exception ||= new_exception
next if new_exception
instrument_finish item, index, result, options
results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) }
ractor.send([[nil, nil], nil, :break]) # stop the ractor
end
exception || results
end
def work_in_processes(job_factory, options, &blk)
workers = create_workers(job_factory, options, &blk)
results = []
results_mutex = Mutex.new # arrays are not thread-safe
exception = nil
UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do
in_threads(options) do |i|
worker = workers[i]
worker.thread = Thread.current
worked = false
begin
loop do
break if exception
item, index = job_factory.next
break unless index
if options[:isolation]
worker = replace_worker(job_factory, workers, i, options, blk) if worked
worked = true
worker.thread = Thread.current
end
begin
result = with_instrumentation item, index, options do
worker.work(job_factory.pack(item, index))
end
results_mutex.synchronize { results[index] = result } # arrays are not threads safe on jRuby
rescue StandardError => e
exception = e
if exception.is_a?(Kill)
(workers - [worker]).each do |w|
w.thread&.kill
UserInterruptHandler.kill(w.pid)
end
end
end
end
ensure
worker.stop
end
end
end
exception || results
end
def replace_worker(job_factory, workers, index, options, blk)
options[:mutex].synchronize do
# old worker is no longer used ... stop it
worker = workers[index]
worker.stop
# create a new replacement worker
running = workers - [worker]
workers[index] = worker(job_factory, options.merge(started_workers: running, worker_number: index), &blk)
end
end
def create_workers(job_factory, options, &block)
workers = []
Array.new(options[:count]).each_with_index do |_, i|
workers << worker(job_factory, options.merge(started_workers: workers, worker_number: i), &block)
end
workers
end
def worker(job_factory, options, &block)
child_read, parent_write = IO.pipe
parent_read, child_write = IO.pipe
pid = Process.fork do
self.worker_number = options[:worker_number]
begin
options.delete(:started_workers).each(&:close_pipes)
parent_write.close
parent_read.close
process_incoming_jobs(child_read, child_write, job_factory, options, &block)
ensure
child_read.close
child_write.close
end
end
child_read.close
child_write.close
Worker.new(parent_read, parent_write, pid)
end
def process_incoming_jobs(read, write, job_factory, options, &block)
until read.eof?
data = Marshal.load(read)
item, index = job_factory.unpack(data)
result =
begin
call_with_index(item, index, options, &block)
# https://github.com/rspec/rspec-support/blob/673133cdd13b17077b3d88ece8d7380821f8d7dc/lib/rspec/support.rb#L132-L140
rescue NoMemoryError, SignalException, Interrupt, SystemExit # rubocop:disable Lint/ShadowedException
raise $!
rescue Exception # # rubocop:disable Lint/RescueException
ExceptionWrapper.new($!)
end
begin
Marshal.dump(result, write)
rescue Errno::EPIPE
return # parent thread already dead
end
end
end
# options is either a Integer or a Hash with :count
def extract_count_from_options(options)
if options.is_a?(Hash)
count = options[:count]
else
count = options
options = {}
end
[count, options]
end
def call_with_index(item, index, options, &block)
args = [item]
args << index if options[:with_index]
results = block.call(*args)
if options[:return_results]
results
else
nil # avoid GC overhead of passing large results around
end
end
def with_instrumentation(item, index, options)
instrument_start(item, index, options)
result = yield
instrument_finish(item, index, result, options)
result unless options[:preserve_results] == false
end
def instrument_finish(item, index, result, options)
return unless (on_finish = options[:finish])
return instrument_finish_in_order(item, index, result, options) if options[:finish_in_order]
options[:mutex].synchronize { on_finish.call(item, index, result) }
end
# yield results in the order of the input items
# needs to use `options` to store state between executions
# needs to use `done` index since a nil result would also be valid
def instrument_finish_in_order(item, index, result, options)
options[:mutex].synchronize do
# initialize our state
options[:finish_done] ||= []
options[:finish_expecting] ||= 0 # we wait for item at index 0
# store current result
options[:finish_done][index] = [item, result]
# yield all results that are now in order
break unless index == options[:finish_expecting]
index.upto(options[:finish_done].size).each do |i|
break unless (done = options[:finish_done][i])
options[:finish_done][i] = nil # allow GC to free this item and result
options[:finish].call(done[0], i, done[1])
options[:finish_expecting] += 1
end
end
end
def instrument_start(item, index, options)
return unless (on_start = options[:start])
options[:mutex].synchronize { on_start.call(item, index) }
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment