Skip to content

Instantly share code, notes, and snippets.

@yaauie
Created September 13, 2022 01:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yaauie/7053e35b7e46ff6c8892358eb2effa9e to your computer and use it in GitHub Desktop.
Save yaauie/7053e35b7e46ff6c8892358eb2effa9e to your computer and use it in GitHub Desktop.
Flatten all or part of a Logstash event, in-place or targeted, optionally destructively
###############################################################################
# flatten-structure.logstash-filter-ruby.rb
# ---------------------------------
# A script for a Logstash Ruby Filter to flatten a nested structure to produce
# flat structure whose keys are the paths of the previous structure
###############################################################################
#
# Copyright 2022 Ry Biesemeyer
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
def register(params)
params = params.dup
# source: if provided, only the hash at the provided field reference will be walked
# (default: entire event)
@source = params.delete('source')
# target: if provided, the flattened representation of all fields will be
# placed in a single key/value map in the target field
# (default: same as `source`)
@target = params.delete('target') { @source }
# separator: set with a string of characters to specify how to separate field names
# on the flattened keypath (default: `_`)
@separator = params.delete('separator') { '_' }
# destructive: set to `true` to _delete_ deeply-nested collections after their contents
# have been copied to the separator-joined field names
# (default: false)
@destructive = _extract_boolean_param(params, 'destructive', false)
params.empty? || report_configuration_error("unknown script parameter(s): #{params.keys}.")
# runtime optimizations
@source_empty_keypath = [@source].compact
end
COLLECTION_ELEMENT_COMPLETE_MARKER = Object.new
def report_configuration_error(message)
raise LogStash::ConfigurationError, message
end
def _extract_boolean_param(params, param_name, default_value)
raw_value = params.delete(param_name) { default_value }
case raw_value
when true, 'true' then true
when false, 'false' then false
else report_configuration_error("script parameter `#{param_name}` must be either `true` or `false`; got `#{raw_value}`.")
end
end
def filter(event)
source_map = @source.nil? ? event.to_hash : event.get(@source)
return [event] unless source_map
fail('source not a key/value map') unless source_map.kind_of?(Hash)
_walk(source_map, [@source].compact) do |keypath, value|
logger.trace("YIELDING(#{keypath}) => `#{value.inspect}`") if logger.trace?
target_nested_keypath = [@target, keypath[(@source ? 1 : 0)..-1].join(@separator)].compact
# set the new path value, but don't overwrite or include collection markers
unless COLLECTION_ELEMENT_COMPLETE_MARKER == value
event.set(_build_field_reference(target_nested_keypath), value) unless target_nested_keypath == keypath
end
# remove _nested_ but not top-level fields in our target since we didn't overwrite them
if @destructive && COLLECTION_ELEMENT_COMPLETE_MARKER == value && keypath != @source_empty_keypath
event.remove(_build_field_reference(keypath))
end
end
rescue => e
logger.error('failed to flatten', exception: e.message)
event.tag('_flattenscripterror')
ensure
return [event]
end
##
# walks the provided hash, yielding the key-path and value for each leaf node
# and the COLLECTION_ELEMENT_COMPLETE_MARKER object for collections after they
# have been fully iterated over.
#
# @param element [Hash{String=>Object},Array[Object],Object]
# @param keypath [Array[String]]
#
# @yieldparam keypath [Array<String>]
# @yieldparam value [Object]
#
# @return [void]
def _walk(element, keypath=[], &blk)
return enum_for(:_walk, element, keypath) unless block_given?
case
when element.kind_of?(Hash)
element.each do |sub_key, sub_element|
_walk(sub_element, keypath + [sub_key], &blk)
end
yield(keypath, COLLECTION_ELEMENT_COMPLETE_MARKER)
when element.kind_of?(Array)
element.each_with_index do |sub_element, sub_index|
_walk(sub_element, keypath + [sub_index], &blk)
end
yield(keypath, COLLECTION_ELEMENT_COMPLETE_MARKER)
else
yield(keypath, element)
end
end
##
# builds a valid field reference from the provided components
def _build_field_reference(fragments)
return fragments[0] if fragments.size == 1
return "[#{fragments.join('][')}]"
end
###############################
# TESTING
# begin: do_* helpers
# the logger available in the execution context scope
# is NOT available to the tests as they execute :sad_trombone:
do_log_debug = ->(message) { logger.debug message }
# end: do_* helpers
common_test_event = {
"int" => 1,
"str" => "fubar",
"empty_array" => [],
"empty_hash" => {},
"array" => [
{"int" => 12},
{"str" => "foobar"},
{"empty_hash" => {}},
{"empty_array" => []}
],
"hash" => {
"int" => 123,
"str" => "FUBAR",
"empty_hash" => {},
"empty_array" => [],
"non-empty_hash" => {
"a" => "b",
"nested" => {
"another" => "level",
"so" => "deep",
},
"empty_hash" => {},
}
}
}
shared_event_provider = Proc.new do
# ensure we have a deep copy of the common test event
in_event { ::Marshal.load(::Marshal.dump(common_test_event)) }
end
shared_expectation_non_destructive = Proc.new do
expect('leaves nested values in-tact') do |events|
event = events.first
{
'int' => 1,
'str' => "fubar",
'[empty_array]' => [],
'[empty_hash]' => {},
'[array][3][empty_array]' => [],
'[array][2][empty_hash]' => {},
'[array][1][str]' => "foobar",
'[array][0][int]' => 12,
'[hash][int]' => 123,
'[hash][str]' => "FUBAR",
'[hash][empty_hash]' => {},
'[hash][empty_array]' => [],
'[hash][non-empty_hash][a]' => "b",
'[hash][non-empty_hash][nested][another]' => "level",
'[hash][non-empty_hash][nested][so]' => "deep",
'[hash][non-empty_hash][empty_hash]' => {},
}.all? do |existing_fieldreference, expected_value|
actual_value = event.get(existing_fieldreference)
do_log_debug["EXPECT `#{existing_fieldreference}` to have value `#{expected_value.inspect}`, GOT `#{actual_value.inspect}`"]
expected_value == actual_value
end
end
end
shared_expectation_single_event = Proc.new do
expect('produces single event') do |events|
events.size == 1
end
end
shared_expectation_flatten_all = Proc.new do
expect('adds flattened versions at top-level with underscore-concatenated keys') do |events|
event = events.first
{
'array_0_int' => 12,
'array_1_str' => 'foobar',
'hash_int' => 123,
'hash_str' => "FUBAR",
'hash_non-empty_hash_a' => "b",
'hash_non-empty_hash_nested_another' => "level",
'hash_non-empty_hash_nested_so' => "deep",
}.all? do |flattened_field_reference, expected_value|
actual_value = event.get(flattened_field_reference)
do_log_debug["EXPECT `#{flattened_field_reference}` to have value `#{expected_value.inspect}`, GOT `#{actual_value.inspect}`"]
expected_value == actual_value
end
end
end
test 'defaults' do
parameters { Hash.new }
instance_eval(&shared_event_provider)
instance_eval(&shared_expectation_single_event)
instance_eval(&shared_expectation_non_destructive)
instance_eval(&shared_expectation_flatten_all)
end
test 'custom source' do
parameters do
{
'source' => 'hash',
}
end
instance_eval(&shared_event_provider)
instance_eval(&shared_expectation_single_event)
instance_eval(&shared_expectation_non_destructive)
expect('adds flattened versions in-place with underscore-concatenated keys') do |events|
event = events.first
{
'[hash][int]' => 123,
'[hash][str]' => "FUBAR",
'[hash][non-empty_hash_a]' => "b",
'[hash][non-empty_hash_nested_another]' => "level",
'[hash][non-empty_hash_nested_so]' => "deep",
}.all? do |expected_flat_key, expected_value|
event.get(expected_flat_key) == expected_value
end
end
end
test 'custom target' do
parameters do
{
'target' => 'flattened',
}
end
instance_eval(&shared_event_provider)
instance_eval(&shared_expectation_single_event)
instance_eval(&shared_expectation_non_destructive)
expect('adds flattened versions to target with underscore-concatenated keys') do |events|
event = events.first
{
'[flattened][hash_int]' => 123,
'[flattened][hash_str]' => "FUBAR",
'[flattened][hash_non-empty_hash_a]' => "b",
'[flattened][hash_non-empty_hash_nested_another]' => "level",
'[flattened][hash_non-empty_hash_nested_so]' => "deep",
}.all? do |flattened_field_reference, expected_value|
actual_value = event.get(flattened_field_reference)
do_log_debug["EXPECT `#{flattened_field_reference}` to have value `#{expected_value.inspect}`, GOT `#{actual_value.inspect}`"]
expected_value == actual_value
end
end
end
test 'custom separator' do
parameters do
{
'separator' => '+',
}
end
instance_eval(&shared_event_provider)
instance_eval(&shared_expectation_single_event)
instance_eval(&shared_expectation_non_destructive)
expect('adds flattened versions at top-level with separator-concatenated keys') do |events|
event = events.first
{
'array+0+int' => 12,
'array+1+str' => 'foobar',
'hash+int' => 123,
'hash+str' => "FUBAR",
'hash+non-empty_hash+a' => "b",
'hash+non-empty_hash+nested+another' => "level",
'hash+non-empty_hash+nested+so' => "deep",
}.all? do |flattened_field_reference, expected_value|
actual_value = event.get(flattened_field_reference)
do_log_debug["EXPECT `#{flattened_field_reference}` to have value `#{expected_value.inspect}`, GOT `#{actual_value.inspect}`"]
expected_value == actual_value
end
end
end
test 'destructive => true' do
parameters do
{
'destructive' => 'true',
}
end
instance_eval(&shared_event_provider)
instance_eval(&shared_expectation_single_event)
instance_eval(&shared_expectation_flatten_all)
expect('removes all visited deeply-nested nodes, including empty ones') do |events|
event = events.first
[
'[empty_array]',
'[empty_hash]',
'[array][3][empty_array]',
'[array][3]',
'[array][2][empty_hash]',
'[array][2]',
'[array][1][str]',
'[array][1]',
'[array][0][int]',
'[array][0]',
'[hash][int]',
'[hash][str]',
'[hash][empty_hash]',
'[hash][empty_array]',
'[hash][non-empty_hash][a]',
'[hash][non-empty_hash][nested][another]',
'[hash][non-empty_hash][nested][so]',
'[hash][non-empty_hash][nested]',
'[hash][non-empty_hash][empty_hash]',
'[hash][non-empty_hash]',
].all? do |kfr|
field_exists = event.include?(kfr)
do_log_debug["EXPECT `#{kfr}` to NOT have value, and it was `#{field_exists ? 'PRESENT' : 'ABSENT'}`"]
field_exists == false
end
end
end
test 'destructive => true with custom source' do
parameters do
{
'destructive' => 'true',
'source' => '[hash][non-empty_hash]',
}
end
instance_eval(&shared_event_provider)
instance_eval(&shared_expectation_single_event)
expect('adds flattened versions in-place') do |events|
event = events.first
{
'[hash][non-empty_hash][a]' => "b",
'[hash][non-empty_hash][nested_another]' => "level",
'[hash][non-empty_hash][nested_so]' => "deep",
}.all? do |flattened_field_reference, expected_value|
actual_value = event.get(flattened_field_reference)
do_log_debug["EXPECT `#{flattened_field_reference}` to have value `#{expected_value.inspect}`, GOT `#{actual_value.inspect}`"]
expected_value == actual_value
end
end
expect('removes relevant deeply-nested source fields, including empty ones') do |events|
event = events.first
[
'[hash][non-empty_hash][nested][another]',
'[hash][non-empty_hash][nested][so]',
'[hash][non-empty_hash][nested]',
'[hash][non-empty_hash][empty_hash]',
].all? do |kfr|
field_exists = event.include?(kfr)
do_log_debug["EXPECT `#{kfr}` to NOT have value, and it was `#{field_exists ? 'PRESENT' : 'ABSENT'}`"]
field_exists == false
end
end
expect('leaves unrelated fields in-tact') do |events|
event = events.first
{
'int' => 1,
'str' => "fubar",
'[empty_array]' => [],
'[empty_hash]' => {},
'[array][3][empty_array]' => [],
'[array][2][empty_hash]' => {},
'[array][1][str]' => "foobar",
'[array][0][int]' => 12,
'[hash][int]' => 123,
'[hash][str]' => "FUBAR",
'[hash][empty_hash]' => {},
'[hash][empty_array]' => [],
}.all? do |existing_fieldreference, expected_value|
actual_value = event.get(existing_fieldreference)
do_log_debug["EXPECT `#{existing_fieldreference}` to have value `#{expected_value.inspect}`, GOT `#{actual_value.inspect}`"]
expected_value == actual_value
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment