Skip to content

Instantly share code, notes, and snippets.

@ericbuehl
Forked from alexdean/gist:4674639
Last active August 29, 2015 14:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ericbuehl/d44f0b9a01a1e120fac4 to your computer and use it in GitHub Desktop.
Save ericbuehl/d44f0b9a01a1e120fac4 to your computer and use it in GitHub Desktop.
# TOTALLY UNTESTED. Just a sketch.
module BatchAccumulator
def batch_initialize(options={})
if ! self.class.method_defined?(:flush)
raise ArgumentError, "Any class including BatchAccumulator must define a flush() method."
end
@batch_events ||= 50
@batch_timeout ||= 5
# events we've accumulated
batch_clear_pending
@pending_mutex = Mutex.new
# count of events which are being flushed
# see batch_flush
@outgoing_count = 0
@last_flush = Time.now.to_i
@flush_thread = Thread.new do
while sleep(@batch_timeout) do
flush(:force => true)
end
end
@flush_mutex = Mutex.new
@batch_initialized = true
end
def batch_clear_pending
@pending = Hash.new { |h, k| h[k] = [] }
@pending_count = 0
end
# save an event for later delivery
# events are grouped by the (optional) group parameter you provide
# groups of events, plus the group name, are passed to your batch_flush() method
def batch_receive(event, group=nil)
batch_initialize if ! @batch_initialized
# block if we've accumulated too many events
while (@pending_count + @outgoing_count) > @batch_events do end
@pending_mutex.synchronize do
@pending[group] << event
@pending_count += 1
end
batch_flush
end
def batch_flush(options={})
force = options[:force] || options[:final]
final = options[:final]
# final flush will wait for lock, so we are sure to flush out all buffered events
if options[:final]
@flush_mutex.lock
elsif ! @flush_mutex.try_lock # failed to get lock, another flush already in progress
return
end
begin
time_since_last_flush = Time.now.to_i - @last_flush
if (force && @pending_count > 0) ||
(@pending_count >= @batch_events) ||
(time_since_last_flush >= @batch_timeout && @pending_count > 0)
@pending_mutex.synchronize do
outgoing = @pending
@outgoing_count = @pending_count
batch_clear_pending
end
@logger.debug("Flushing output",
:outgoing_count => @outgoing_count,
:time_since_last_flush => time_since_last_flush,
:outgoing_events => outgoing,
:batch_timeout => @batch_timeout,
:force => force,
:final => final)
outgoing.each do |group, events|
begin
group.nil? ? flush(events) : flush(events, group)
outgoing.delete(group)
@outgoing_count -= events.size
rescue => e
@logger.warn("Failed to flush backlog of events",
:outgoing_count => @outgoing_count,
:identity => identity, :exception => e,
:backtrace => e.backtrace)
if self.class.method_defined?(:on_flush_error)
on_flush_error e
end
sleep 1
retry
end
@last_flush = Time.now.to_i
end
end
ensure
@flush_mutex.unlock
end
end
end
require "logstash/outputs/base"
require "logstash/namespace"
# send events to a redis database using RPUSH
#
# For more information about redis, see <http://redis.io/>
class LogStash::Outputs::Redis < LogStash::Outputs::Base
config_name "redis"
plugin_status "beta"
# Name is used for logging in case there are multiple instances.
# TODO: delete
config :name, :validate => :string, :default => 'default',
:deprecated => true
# The hostname(s) of your redis server(s). Ports may be specified on any
# hostname, which will override the global port config.
#
# For example:
#
# "127.0.0.1"
# ["127.0.0.1", "127.0.0.2"]
# ["127.0.0.1:6380", "127.0.0.1"]
config :host, :validate => :array, :default => ["127.0.0.1"]
# Shuffle the host list during logstash startup.
config :shuffle_hosts, :validate => :boolean, :default => true
# The default port to connect on. Can be overridden on any hostname.
config :port, :validate => :number, :default => 6379
# The redis database number.
config :db, :validate => :number, :default => 0
# Redis initial connection timeout in seconds.
config :timeout, :validate => :number, :default => 5
# Password to authenticate with. There is no authentication by default.
config :password, :validate => :password
# The name of the redis queue (we'll use RPUSH on this). Dynamic names are
# valid here, for example "logstash-%{@type}"
# TODO: delete
config :queue, :validate => :string, :deprecated => true
# The name of a redis list or channel. Dynamic names are
# valid here, for example "logstash-%{@type}".
# TODO set required true
config :key, :validate => :string, :required => false
# Either list or channel. If redis_type is list, then we will RPUSH to key.
# If redis_type is channel, then we will PUBLISH to key.
# TODO set required true
config :data_type, :validate => [ "list", "channel" ], :required => false
# Set to true if you want redis to batch up values and send 1 RPUSH command
# instead of one command per value to push on the list. Note that this only
# works with data_type="list" mode right now.
#
# If true, we send an RPUSH every "batch_events" events or
# "batch_timeout" seconds (whichever comes first).
config :batch, :validate => :boolean, :default => false
# If batch is set to true, the number of events we queue up for an RPUSH.
config :batch_events, :validate => :number, :default => 50
# If batch is set to true, the maximum amount of time between RPUSH commands
# when there are pending events to flush.
config :batch_timeout, :validate => :number, :default => 5
def register
require 'redis'
# TODO remove after setting key and data_type to true
if @queue
if @key or @data_type
raise RuntimeError.new(
"Cannot specify queue parameter and key or data_type"
)
end
@key = @queue
@data_type = 'list'
end
if not @key or not @data_type
raise RuntimeError.new(
"Must define queue, or key and data_type parameters"
)
end
# end TODO
if @batch
if @data_type != "list"
raise RuntimeError.new(
"batch is not supported with data_type #{@data_type}"
)
end
include BatchAccumulator
end
@redis = nil
if @shuffle_hosts
@host.shuffle!
end
@host_idx = 0
end # def register
def receive(event)
return unless output?(event)
if @batch
batch_receive(event.to_json, event.sprintf(@key))
return
end
event_key_and_payload = [event.sprintf(@key), event.to_json]
begin
@redis ||= connect
if @data_type == 'list'
@redis.rpush *event_key_and_payload
else
@redis.publish *event_key_and_payload
end
rescue => e
@logger.warn("Failed to send event to redis", :event => event,
:identity => identity, :exception => e,
:backtrace => e.backtrace)
sleep 1
@redis = nil
retry
end
end # def receive
def flush(events, key)
@redis.rpush(key, events)
end
def on_flush_error(e)
@redis = nil
connect
end
def teardown
if @batch
batch_flush(:final => true)
end
if @data_type == 'channel' and @redis
@redis.quit
@redis = nil
end
end
private
def connect
@current_host, @current_port = @host[@host_idx].split(':')
@host_idx = @host_idx + 1 >= @host.length ? 0 : @host_idx + 1
if not @current_port
@current_port = @port
end
params = {
:host => @current_host,
:port => @current_port,
:timeout => @timeout,
:db => @db
}
@logger.debug(params)
if @password
params[:password] = @password.value
end
Redis.new(params)
end # def connect
# A string used to identify a redis instance in log messages
def identity
@name || "redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment