Skip to content

Instantly share code, notes, and snippets.

@szajbus
Created November 10, 2015 14:50
Show Gist options
  • Save szajbus/5dc54ddb3c29fcf8b494 to your computer and use it in GitHub Desktop.
Save szajbus/5dc54ddb3c29fcf8b494 to your computer and use it in GitHub Desktop.
Commitable buffer
class Buffer
def initialize(size = 1_000, &blk)
@size = size
@callback = blk
@queue = []
@lock = Mutex.new
end
def <<(object)
@lock.synchronize do
@queue << object
commit! if @queue.size >= @size
end
self
end
def commit!
if @queue.any?
@callback.call(@queue)
@queue = []
end
end
end
# Set up buffer with commit callback as block
buffer = Buffer.new do |docs|
ElasticSearch.bulk(docs, :local)
logger.increase_current_step
end
# Process products, keep adding results to buffer
products.each do |p|
...
buffer << p
end
# Commit what is left in buffer
buffer.commit!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment