Skip to content

Instantly share code, notes, and snippets.

@drbobbeaty
Created November 10, 2012 21:45
Show Gist options
  • Save drbobbeaty/4052640 to your computer and use it in GitHub Desktop.
Save drbobbeaty/4052640 to your computer and use it in GitHub Desktop.
JRuby extensions to Enumerable for parallel processing and more
require 'java'
java_import 'java.util.concurrent.Executors'
java_import 'java.util.concurrent.TimeUnit'
module Enumerable
# From activesupport/lib/active_support/core_ext/enumerable.rb
def sum(identity = 0, &block)
if block_given?
map(&block).sum(identity)
else
inject(:+) || identity
end
end
def avg(&block)
sum(&block).to_f / length
end
def parallel_each(options = {})
thread_count = options[:parallelism] || 4
if thread_count > 1
executor = Executors.new_fixed_thread_pool(thread_count)
self.each do |obj|
executor.execute do
begin
yield obj
rescue Exception => e
if error_handler = options[:on_error]
error_handler.call(e) if error_handler.respond_to?(:call)
end
end
end
end
executor.shutdown
if (final_timeout = options[:shutdown_timeout].to_i) > 0
total_time = 0
begin
return if executor.await_termination(5, TimeUnit::SECONDS)
if executor.get_queue.size > 0
total_time = 0
else
if (total_time += 5) > final_timeout
if error_handler = options[:on_error]
error_handler.call(Timeout::Error.new("[Enumerable::parallel_each] final #{executor.get_active_count} tasks could not complete in the provided #{final_timeout} sec timeout.")) if error_handler.respond_to?(:call)
end
# force a shutdown and return to the caller
executor.shutdown_now
return
end
end
rescue Exception => e
if error_handler = options[:on_error]
error_handler.call(e) if error_handler.respond_to?(:call)
end
end until executor.is_terminated
else
sleep(0.01) until executor.is_terminated
end
else
# don't use executors if we don't need to
self.each do |obj|
begin
yield obj
rescue Exception => e
if error_handler = options[:on_error]
error_handler.call(e) if error_handler.respond_to?(:call)
end
end
end
end
end
def frequencies
self.reduce(Hash.new(0)) { |h, v| h[v] += 1; h }
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment