Skip to content

Instantly share code, notes, and snippets.

@mpilone
Last active January 3, 2016 02:09
Show Gist options
  • Save mpilone/8393588 to your computer and use it in GitHub Desktop.
Save mpilone/8393588 to your computer and use it in GitHub Desktop.
MOVED: This plugin is now merged into Logstash core. -- A Throttle filter for LogStash. The filter can throttle the number of events received which is extremely useful when sending alerts via email.
require "logstash/filters/base"
require "logstash/namespace"
# The throttle filter is for throttling the number of events received. The filter
# is configured with a lower bound, the before_count, and upper bound, the after_count,
# and a period of time. All events passing through the filter will be counted based on
# a key_field. As long as the count is less than the before_count or greater than the
# after_count, the event will be "throttled" which means the filter will be considered
# successful and any tags or fields will be added.
#
# For example, if you wanted to throttle events so you only receive an event after 2
# occurrences and you get no more than 3 in 10 minutes, you would use the
# configuration:
# period => 600
# before_count => 3
# after_count => 5
#
# Which would result in:
# event 1 - throttled (successful filter, period start)
# event 2 - throttled (successful filter)
# event 3 - not throttled
# event 4 - not throttled
# event 5 - not throttled
# event 6 - throttled (successful filter)
# event 7 - throttled (successful filter)
# event x - throttled (successful filter)
# period end
# event 1 - throttled (successful filter, period start)
# event 2 - throttled (successful filter)
# event 3 - not throttled
# event 4 - not throttled
# event 5 - not throttled
# event 6 - throttled (successful filter)
# ...
#
# Another example is if you wanted to throttle events so you only receive 1 event per
# hour, you would use the configuration:
# period => 3600
# before_count => -1
# after_count => 1
#
# Which would result in:
# event 1 - not throttled (period start)
# event 2 - throttled (successful filter)
# event 3 - throttled (successful filter)
# event 4 - throttled (successful filter)
# event x - throttled (successful filter)
# period end
# event 1 - not throttled (period start)
# event 2 - throttled (successful filter)
# event 3 - throttled (successful filter)
# event 4 - throttled (successful filter)
# ...
#
# A common use case would be to use the checksum filter to generate a key_field
# from multiple fields, use the throttle filter to throttle events before 3 and after 5,
# and then use the drop filter to remove throttled events. This configuration might
# appear as:
#
# filter {
# checksum {
# keys => [ "host", "message" ]
# }
# throttle {
# before_count => 3
# after_count => 5
# period => 3600
# key_field => "logstash_checksum"
# add_tag => "throttled"
# }
# if "throttled" in [tags] {
# drop { }
# }
# }
#
# Another case would be to store all events, but only email non-throttled
# events so the op's inbox isn't flooded with emails in the event of a system error.
# This configuration might appear as:
#
# filter {
# throttle {
# before_count => 3
# after_count => 5
# period => 3600
# key_field => "message"
# add_tag => "throttled"
# }
# }
# output {
# if "throttled" not in [tags] {
# email {
# from => "logstash@mycompany.com"
# subject => "Production System Alert"
# to => "ops@mycompany.com"
# via => "sendmail"
# body => "Alert on %{host} from path %{path}:\n\n%{message}"
# options => { "location" => "/usr/sbin/sendmail" }
# }
# }
# elasticsearch_http {
# host => "localhost"
# port => "19200"
# }
# }
#
# The event counts are cleared after the configured period elapses since the
# first instance of the event. That is, all the counts don't reset at the same
# time but rather the throttle period is per event/unique key_field value.
#
# Mike Pilone (@mikepilone)
#
class LogStash::Filters::Throttle < LogStash::Filters::Base
# The name to use in configuration files.
config_name "throttle"
# New plugins should start life at milestone 1.
milestone 1
# The key field used to identify events. Two events with the same value in the
# key field will be considered the same event and will increment the count for
# that event. Refer to the checksum filter if support for multiple fields is needed.
config :key_field, :validate => :string, :required => true
# Events less than this count will be throttled. Setting this value to -1, the
# default, will cause no messages to be throttled based on the lower bound.
config :before_count, :validate => :number, :default => -1, :required => false
# Events greater than this count will be throttled. Setting this value to -1, the
# default, will cause no messages to be throttled based on the upper bound.
config :after_count, :validate => :number, :default => -1, :required => false
# The period in seconds after the first occurrence of an event until the count is
# reset for the event. This period is tracked per unique key_field value.
config :period, :validate => :number, :default => 3600, :required => false
# The maximum number of counters to store before the oldest counter is purged. Setting
# this value to -1 will prevent an upper bound no constraint on the number of counters
# and they will only be purged after expiration. This configuration value should only
# be used as a memory control mechanism and can cause early counter expiration if the
# value is reached. It is recommended to leave the default value and ensure that your
# key_field is selected such that it limits the number of counters required (i.e. don't
# use UUID as the key_field!)
config :max_counters, :validate => :number, :default => 100000, :required => false
# Performs initialization of the filter.
public
def register
@threadsafe = false
@eventCounters = Hash.new
@nextExpiration = nil
end # def register
# Filters the event. The filter is successful if the event should be throttled.
public
def filter(event)
# Return nothing unless there's an actual filter event
return unless filter?(event)
# Return nothing unless the event has the key field
return unless event.include?(@key_field)
now = Time.now
key = event[@key_field]
# Purge counters if too large to prevent OOM.
if @max_counters != -1 && @eventCounters.size > @max_counters then
purgeOldestEventCounter()
end
# Expire existing counter if needed
if @nextExpiration.nil? || now >= @nextExpiration then
expireEventCounters(now)
end
@logger.debug? and @logger.debug(
"filters/#{self.class.name}: next expiration",
{ "nextExpiration" => @nextExpiration })
# Create new counter for this event if this is the first occurrence
counter = nil
if !@eventCounters.include?(key) then
expiration = now + @period
@eventCounters[key] = { :count => 0, :expiration => expiration }
@logger.debug? and @logger.debug("filters/#{self.class.name}: new event",
{ :key => key, :expiration => expiration })
end
# Fetch the counter
counter = @eventCounters[key]
# Count this event
counter[:count] = counter[:count] + 1;
@logger.debug? and @logger.debug("filters/#{self.class.name}: current count",
{ :key => key, :count => counter[:count] })
# Throttle if count is < before count or > after count
if ((@before_count != -1 && counter[:count] < @before_count) ||
(@after_count != -1 && counter[:count] > @after_count)) then
@logger.debug? and @logger.debug(
"filters/#{self.class.name}: throttling event", { :key => key })
filter_matched(event)
end
end # def filter
# Expires any counts where the period has elapsed. Sets the next expiration time
# for when this method should be called again.
private
def expireEventCounters(now)
@nextExpiration = nil
@eventCounters.delete_if { |key, counter|
expiration = counter[:expiration]
expired = expiration <= now
if expired then
@logger.debug? and @logger.debug(
"filters/#{self.class.name}: deleting expired counter",
{ :key => key })
elsif @nextExpiration.nil? || (expiration < @nextExpiration)
@nextExpiration = expiration
end
expired
}
end # def expireEventCounters
# Purges the oldest event counter. This operation is for memory control only
# and can cause early period expiration and thrashing if invoked.
private
def purgeOldestEventCounter()
# Return unless we have something to purge
return unless @eventCounters.size > 0
oldestCounter = nil
oldestKey = nil
@eventCounters.each { |key, counter|
if oldestCounter.nil? || counter[:expiration] < oldestCounter[:expiration] then
oldestKey = key;
oldestCounter = counter;
end
}
@logger.warn? and @logger.warn(
"filters/#{self.class.name}: Purging oldest counter because max_counters " +
"exceeded. Use a better key_field to prevent too many unique event counters.",
{ :key => oldestKey, :expiration => oldestCounter[:expiration] })
@eventCounters.delete(oldestKey)
end
end # class LogStash::Filters::Throttle
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment