Skip to content

Instantly share code, notes, and snippets.

@fxn
Last active March 16, 2021 10:17
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fxn/097830ee6ae05d917b2dc418cb20baa4 to your computer and use it in GitHub Desktop.
Save fxn/097830ee6ae05d917b2dc418cb20baa4 to your computer and use it in GitHub Desktop.
require "concurrent"
class BatchWriter
MAX_BATCH_SIZE = 100
TIMER_OPTS = { execution_interval: 1, timeout_interval: 1 }.freeze
cattr_accessor :buffer
self.buffer = []
cattr_accessor :mutex
self.mutex = Mutex.new
class << self
def push(location)
mutex.synchronize { buffer << location }
end
def start_timer
@timer = Concurrent::TimerTask.execute(TIMER_OPTS) { flush }
end
def stop_timer
@timer.shutdown
end
def flush
# It's important that we block as less as possible. Copy the buffer to a
# local variable, clear, and ready.
locations = []
mutex.synchronize do
locations.concat(buffer)
buffer.clear
end
insert(locations) unless locations.empty?
end
private
def insert(locations)
locations.each_slice(MAX_BATCH_SIZE) do |batch|
# insert batch
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment