|
############################################################################### |
|
# destructive-array-merge.logstash-filter-ruby.rb |
|
# --------------------------------- |
|
# A script for a Logstash Ruby Filter to destructively merge one array field |
|
# into another. |
|
# |
|
# This script has three parameters: |
|
# - `source`: a field reference to the source array |
|
# - `target`: a field reference to the target array |
|
# - `coerce`: when encountering non-array values in either source or target |
|
# fields, setting `coerce => true` causes these fields to first |
|
# be converted to a single-entry array. |
|
############################################################################### |
|
# |
|
# Copyright 2020 Ry Biesemeyer |
|
# |
|
# Permission is hereby granted, free of charge, to any person obtaining a copy |
|
# of this software and associated documentation files (the "Software"), to deal |
|
# in the Software without restriction, including without limitation the rights |
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|
# copies of the Software, and to permit persons to whom the Software is |
|
# furnished to do so, subject to the following conditions: |
|
# |
|
# The above copyright notice and this permission notice shall be included in |
|
# all copies or substantial portions of the Software. |
|
# |
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
|
# SOFTWARE. |
|
def register(params) |
|
params = params.dup # isolate |
|
|
|
@source = params.delete('source') || report_configuration_error("missing required param `source`") |
|
@target = params.delete('target') || report_configuration_error("missing required param `target`") |
|
|
|
coerce = params.delete('coerce') || 'false' |
|
@coerce = case coerce |
|
when 'true' then true |
|
when 'false' then false |
|
else report_configuration_error("invalid value for param `coerce`: `#{coerce}` (expected `true` or `false`)") |
|
end |
|
|
|
params.empty? || report_configuration_error("unknown script parameter(s): #{params.keys}.") |
|
end |
|
|
|
def report_configuration_error(message) |
|
raise LogStash::ConfigurationError, message |
|
end |
|
|
|
def filter(event) |
|
timestamp = event.get('@timestamp') |
|
|
|
return [event] unless event.include?(@source) |
|
|
|
source = event.get(@source) |
|
unless @coerce || source.kind_of?(::Array) |
|
logger.debug("source field `#{@source}` not an array", :event => event.to_hash) if logger.debug? |
|
return [event] |
|
end |
|
target = event.get(@target) |
|
unless target.nil? || @coerce || target.kind_of?(::Array) |
|
logger.debug("target field `#{@target}` not an array", :event => event.to_hash) if logger.debug? |
|
return [event] |
|
end |
|
|
|
# to prevent partial failure, we clone the event and return the |
|
# modified clone on success or the original unmodified event |
|
# on failure |
|
clone_event = LogStash::Event.new(event.to_hash_with_metadata) |
|
source = clone_event.remove(@source) |
|
source = Array(source) if @coerce |
|
|
|
return [event] if source.empty? |
|
|
|
merge_target = clone_event.get(@target) || [] |
|
merge_target = Array(merge_target) if @coerce |
|
source.each do |value| |
|
merge_target << value |
|
end |
|
clone_event.set(@target, merge_target) |
|
|
|
event.cancel |
|
|
|
return [clone_event] |
|
rescue => e |
|
logger.error('failed to merge fields on event', exception: e.message, event: event.to_hash, source: @source, target: @target) |
|
event.tag('_arraymerge_error') |
|
[event] |
|
end |