Skip to content

Instantly share code, notes, and snippets.

@lwoodson
Created April 19, 2018 21:13
Show Gist options
  • Save lwoodson/53c5c0da6f7723efe33b7bef2e22291d to your computer and use it in GitHub Desktop.
Save lwoodson/53c5c0da6f7723efe33b7bef2e22291d to your computer and use it in GitHub Desktop.
require 'benchmark'
require 'ostruct'
module Platappus
##
# Class capable of carrying out multi-threaded map/reduce
# operations. Dev specifies 1-N map blocks to produce
# interim values, and a reduce block to aggregate results.
# For example:
#
# mr = MapReduce.new
#
# mr.map do
# sleep_time = (1..30).to_a.sample / 10.0
# sleep sleep_time
# sleep_time
# end
#
# mr.reduce do |results|
# results(&:+)
# end
#
# mr_result = mr.exec!
#
# mr_result.value
# => 14.3999999999
#
# The result of a MapReduce.exec! call has the following
# attributes:
#
# - value the final value of the reduce aggregation
# - errors an array of exceptions while executing map
# blocks
# - has_errors true if there were any errors
# - map_time the time to carry out all parallelized map
# operations
# - reduce_time the time to carry out the reduction operation
# - total_time the total time to carry out the exec! operation
#
# The map operations are carried out in parallel by different
# threads. Each thread carries out a number of map operations
# defined by the batch_size constructor arg. This value defaults
# to 1, so that each map operation is carried out by a separate
# thread.
#
# If an exception is raised during reduction, a
# MapReduce::ReductionError is raised containing the interim
# results and any mapping errors prior to the reduction. The
# reduction error's cause attribute will contain the underlying
# exception.
#
class MapReduce
##
# Create a new MapReduce operation for the specified batch
# size and check interval
def initialize(batch_size: 1, check_interval: 0.05)
@batch_size = batch_size
@check_interval = check_interval
@mappers = []
@reducer = lambda do |interim_results|
interim_results
end
end
##
# Specify a map operation for the MapReduce
def map(&block)
@mappers << block
end
##
# Specify the reduction operation for the MapReduce
def reduce(&block)
@reducer = block
end
##
# Execute the parallelized map/reduce op
def exec!
result = nil
exec_benchmark = Benchmark.measure do
count = @mappers.size
batch_size = @batch_size
interim_results = []
errors = []
thread_count = 0
threads = []
map_benchmark = Benchmark.measure do
@mappers.each_slice(batch_size) do |mappers|
thread = "thread-#{thread_count += 1}"
threads << Thread.new do
mappers.each do |mapper|
begin
interim_results << mapper.call(thread)
rescue StandardError => e
errors << e
ensure
count -= 1
end
end
end
end
while count > 0
sleep @check_interval
end
end
reduction_result = nil
begin
reduce_benchmark = Benchmark.measure do
reduction_result = @reducer.call(interim_results, errors)
end
rescue StandardError => e
raise ReductionError.new(e, interim_results, errors)
end
result = OpenStruct.new value: reduction_result,
errors: errors,
has_errors: errors.any?,
map_time: map_benchmark.real,
reduce_time: reduce_benchmark.real
end
result.total_time = exec_benchmark.real
result
end
class ReductionError < RuntimeError
attr_reader :interim_results, :mapping_errors
def initialize(error, interim_results, mapping_errors)
super("Reduction error")
@interim_results = interim_results
@mapping_errors = mapping_errors
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment