Skip to content

Instantly share code, notes, and snippets.

@nz
Last active September 3, 2019 17:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nz/adf2c84e0e8bb958e2bf4a6b65eb4b1a to your computer and use it in GitHub Desktop.
Save nz/adf2c84e0e8bb958e2bf4a6b65eb4b1a to your computer and use it in GitHub Desktop.
Dynamic time-based batch sizing
elasticsearch_url = ENV.fetch('ELASTICSEARCH_URL', 'http://localhost:9200')
elasticsearch = Elasticsearch::Client.new(url: elasticsearch_url, trace: true)
importer = Importer.new
importer.batch_handler = lambda do |actions|
elasticsearch.bulk(body: actions)
end
importer.start
csv = CSV.new(File.open('data/books.csv', 'r'), headers: true)
csv.each do |row|
importer << {
index: {
_index: 'books',
_type: 'doc',
data: row.to_hash
}
}
end
importer.finish
# Dynamically import batches of objects
class Importer
# action for processing a single batch of actions
attr_accessor :batch_handler
# batch_range - start at the bottom and max out at the top
# duration_range - when below this range, increase the batch; when above, decrease
# ratio - the expansion/contraction ratio relative to the minimum batch size
# ratio_factor - the amount of expansion, dynamically updated in response to the timing
def initialize(batch_range = (1_000..10_000), duration_range = (0.5..1.0), ratio = 1.6)
@batch_range = batch_range
@duration_range = duration_range
@ratio = 1.6
@ratio_factor = 1
@queue = Queue.new
end
def capped_ratio_factor
@ratio_factor > 1 ? @ratio_factor : 1
end
def current_batch_size
size = @batch_range.first * @ratio**capped_ratio_factor
size < @batch_range.last ? size : batch_range.last
end
# Accumulate
def <<(thing)
@queue << thing
end
def finish
@queue << nil
@worker_thread.join
end
def collect_batch
thing = nil
things = []
loop do
thing = @queue.pop
break if thing.nil? # signal termination
things << thing
break if things.length >= current_batch_size
end
[thing, things] # include the last returned thing as a sentinel
end
def start
# TODO raise if batch_handler is not defined
@worker_thread = Thread.new do
loop do
sentinel, things = collect_batch
break if sentinel.nil? && things.empty? # nothing to do
# send to the worker action
t0 = Time.now
@batch_handler.call(things)
# adjust the batch size based on duration
if things.length == current_batch_size
@ratio_factor += 1 if Time.now - t0 < @duration_range.first
@ratio_factor -= 1 if Time.now - t0 > @duration_range.last
end
break if sentinel.nil?
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment