Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yaauie/9566fdb8cf3696797aa3bffb408ada47 to your computer and use it in GitHub Desktop.
Save yaauie/9566fdb8cf3696797aa3bffb408ada47 to your computer and use it in GitHub Desktop.
Logstash: tag when field count over threshold
###############################################################################
# tag-when-fields-over-threshold.logstash-filter-ruby.rb
# ---------------------------------
# A script for a Logstash Ruby Filter to tag an event when it has "too many"
# fields, with a configurable threshold.
###############################################################################
#
# Copyright 2020 Ry Biesemeyer
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
def register(params)
params = params.dup
# source: if provided, only the hash at the provided field reference will be walked
# (default: entire event)
@source = params.delete('source')
# tag: when the leaf-node field count is over the threshold, this tag will be
# applied (default: `fields_over_threshold`)
@tag = params.delete('tag') || 'fields_over_threshold'
# threshold: the minimum number of leaf-node fields to detect before applying
# the `tag` (default: 1000)
#
@threshold = Integer(params.delete('threshold') || 1000)
# recursive: set to `false` to avoid recusrive walking of deeply-nested hashes and arrays
# (default: true)
@recursive = params.delete('recursive')
@recursive = case @recursive && @recursive.downcase
when nil, true, 'true' then true
when false, 'false' then false
else report_configuration_error("script parameter `recursive` must be either `true` or `false`; got `#{@recursive}`.")
end
params.empty? || report_configuration_error("unknown script parameter(s): #{params.keys}.")
end
def report_configuration_error(message)
raise LogStash::ConfigurationError, message
end
def filter(event)
source_map = @source.nil? ? event.to_hash : event.get(@source)
return [event] unless source_map
fail('source not a key/value map') unless source_map.kind_of?(Hash)
field_count = 0
_walk(source_map, [@source].compact) do |field_reference, value|
logger.trace("YIELDING(#{field_reference}) => `#{value.inspect}`") if logger.trace?
field_count += 1 unless value.nil?
end
event.set('[@metadata][field_count]', field_count)
if field_count >= @threshold
event.tag(@tag)
end
rescue => e
logger.error('failed to count fields', exception: e.message)
event.tag('_tagwhenfieldsoverthresholderror')
ensure
return [event]
end
##
# walks the provided hash, yielding the field reference and value for each leaf node
#
# @param element [Hash{String=>Object},Array[Object],Object]
# @param keypath [Array[String]]
#
# @yieldparam fieldreference [String]
# @yieldparam value [Object]
#
# @return [void]
def _walk(element, keypath=[], &blk)
case
when @recursive && element.kind_of?(Array) && !element.empty?
element.each_with_index do |sub_element, sub_index|
_walk(sub_element, keypath + [sub_index], &blk)
end
when @recursive && element.kind_of?(Hash) && !element.empty?
element.each do |sub_key, sub_element|
_walk(sub_element, keypath + [sub_key], &blk)
end
else
yield(_build_field_reference(keypath), element)
end
end
##
# builds a valid field reference from the provided components
def _build_field_reference(fragments)
return fragments[0] if fragments.size == 1
return "[#{fragments.join('][')}]"
end
filter {
  ruby {
    path => "${PWD}/tag-when-fields-over-threshold.logstash-filter-ruby.rb"
    script_params => {
      recursive => true
      threshold => 100
    }
  }
  if 'fields_over_threshold' in [tags] {
    drop {}
  }
}

Available script_params:

  • source: if provided, only the hash at the provided field reference will be walked (default: entire event)
  • tag: when the leaf-node field count is over the threshold, this tag will be applied (default: fields_over_threshold)
  • threshold: the minimum number of leaf-node fields to detect before applying the tag (default: 1000)
  • recursive: set to false to avoid recusrive walking of deeply-nested hashes and arrays (default: true)
@yaauie
Copy link
Author

yaauie commented Jun 1, 2020

Note, this also sets [@metadata][field_count] to the count of fields on the event, to empower other pipeline plugins to use the data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment