Skip to content

Instantly share code, notes, and snippets.

@jazzytomato
Created January 25, 2017 14:25
Show Gist options
  • Save jazzytomato/f919c8db3570b4d4ad9526c0f14a9f94 to your computer and use it in GitHub Desktop.
Save jazzytomato/f919c8db3570b4d4ad9526c0f14a9f94 to your computer and use it in GitHub Desktop.
# Enumerable extensions
module Enumerable
# this will be able to handle a pool of workers (actors)
# can be useful to have a pool to limit the amount of concurrent work
class ParallelWorker
POOL_SIZE = ENV.fetch('CELLULOID_POOL_SIZE', [Celluloid.cores, 2].max).to_i
include Celluloid
def yielder(element, block)
block.call(element)
end
end
# similar to clojure.core/pmap
# like map, but each iteration is processed in a different thread
# to optimise for blocking i/o
def pmap(size = ParallelWorker::POOL_SIZE, &block)
ActiveRecord::Base.connection_pool.with_connection do
pool = ParallelWorker.pool(size: size)
futures = map { |elem| pool.future(:yielder, elem, block) }
futures.map(&:value)
end
ensure
pool.terminate if pool.alive?
end
def pselect(size = ParallelWorker::POOL_SIZE, &block)
ActiveRecord::Base.connection_pool.with_connection do
pool = ParallelWorker.pool(size: size)
tuples = map do |elem|
[pool.future(:yielder, elem, block), elem].freeze
end
tuples.map { |(future, elem)| elem if future.value }.compact
end
ensure
pool.terminate if pool.alive?
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment