Skip to content

Instantly share code, notes, and snippets.

@drbobbeaty
Created November 9, 2012 21:02
Show Gist options
  • Save drbobbeaty/4048211 to your computer and use it in GitHub Desktop.
Save drbobbeaty/4048211 to your computer and use it in GitHub Desktop.
Optionally queueing Ruby CouchDB/CouchRest client
# encoding: utf-8
require 'quantum_lead/application'
require 'singleton'
require 'logger'
require 'version'
require 'couchrest'
require 'java'
java_import 'java.util.concurrent.ConcurrentLinkedQueue'
class Database
include Singleton
# On large bulk store operations, we still need to keep some kind of
# limit on how many documents we'll send up to CouchDB at once. This
# will be that limit.
MAX_DOCS_PER_SEND = 2000
# This will be sent to the unspoolers to tell them to stop what they
# are doing and shut down.
QUEUE_SHUTDOWN = '__queue_shutdown__'
def self.store(label, message = '')
return nil unless use_db?
doc = block_given? ? yield : {}
# make the tagging metadata for this save operation
metadata = {
:created => Time.now,
:label => label,
:message => message,
:environment => QuantumLead::Application.environment,
:division => QuantumLead::Application.division,
:execution_tag => QuantumLead::Application.execution_tag,
:jar_name => QuantumLead::Application.jar_name,
:pid => Process.pid,
:version => { :version => Version.version,
:branch => Version.branch }
}
if doc.is_a?(Array)
spool(doc.map { |elem| elem.to_hash.merge(:meta => metadata) })
else
# make it a hash - a document, if it's not already
doc = { :data => doc } unless doc.is_a?(Hash)
spool(doc.to_hash.merge(:meta => metadata))
end
end
def self.view(path, opts = {})
return nil unless use_db?
retryable { database.view(path, opts) }
end
def self.get(id, opts = {})
return nil unless use_db?
retryable { database.get(id, opts) }
end
def self.save_doc(doc, opts = {})
return nil unless use_db?
meta = doc["meta"] || {}
payload = doc.merge("meta" => meta.merge("updated" => Time.now))
retryable { database.save_doc(payload, opts) }
end
def self.use_db?
QuantumLead::Application.config.application.data_to == 'database'
end
def self.start(thread_count = 2)
thread_count.times do
workers << Thread.new { self.unspool() }
end
end
def self.flush
unless workers.empty?
# tell all the workers it's time to stop, and then wait until they are done
workers.each { |t| spool(QUEUE_SHUTDOWN) }
QuantumLead::Application.logger.info("Database.flush[#{Thread.current.object_id}]") { "waiting for queue to empty..." }
workers.each { |t| t.join unless t.nil? }
end
end
private
def self.queue
@queue ||= ConcurrentLinkedQueue.new
end
def self.workers
@workers ||= []
end
def self.spool(doc)
return nil unless use_db? && !doc.nil?
workers.empty? ? shoot(doc) : queue.offer(doc)
end
def self.unspool
keep_going = true
QuantumLead::Application.logger.info("Database.unspool[#{Thread.current.object_id}]") { "starting unspooler" }
begin
doc = queue.poll
if doc.nil?
sleep(0.5)
elsif doc.is_a?(String)
# this is a control message to this processing code
keep_going = false if doc == QUEUE_SHUTDOWN
else
shoot(doc)
end
end while keep_going
QuantumLead::Application.logger.info("Database.unspool[#{Thread.current.object_id}]") { "shutting down unspooler" }
end
def self.shoot(doc)
return nil if doc.nil?
if doc.is_a?(Array)
# if it's an array - handle it as a bulk_save
doc.each_slice(MAX_DOCS_PER_SEND) do |bunch|
retryable { database.bulk_save(bunch) }
end
else
retryable { database.save_doc(doc) }
end
end
def self.database(config = QuantumLead::Application.config)
CouchRest.proxy(config.database.proxy_uri) if config.database.use_proxy?
Thread.current[:couch_db] ||= CouchRest.database(config.database.uri)
end
# We need to have a general retry processor for all the database calls,
# and this is it. We have a block, and this will retry it some number of
# times - logging problems, and giving up if it can't eventually get it
# right. This makes it a lot easier to deal with intermittant issues with
# the database.
def self.retryable(retries = 3)
return (block_given? ? yield : nil)
rescue Errno::EADDRNOTAVAIL => se
if retries > 0
QuantumLead::Application.logger.warn('Database.retryable') { se }
QuantumLead::Application.logger.warn('Database.retryable') { "problem sending data to CouchDB... retrying..." }
sleep(0.5)
retries -= 1
retry
else
QuantumLead::Application.logger.error('Database.retryable') { se }
QuantumLead::Application.logger.error('Database.retryable') { "problem sending data to CouchDB... giving up..." }
raise
end
rescue Exception, RestClient::RequestTimeout => e
if retries > 0
QuantumLead::Application.logger.warn('Database.retryable') { e }
QuantumLead::Application.logger.warn('Database.retryable') { "problem dealing with CouchDB... retrying..." }
retries -= 1
retry
else
QuantumLead::Application.logger.error('Database.retryable') { e }
QuantumLead::Application.logger.error('Database.retryable') { "problem dealing with CouchDB... giving up..." }
raise
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment