Ruby filter for parsing json and adding to event in a Logstash pipeline
# ruby { | |
# path => "/etc/logstash/ancillary/ruby-scripts/json-to-event.rb" <--Example path to ruby file, be sure to update with yours | |
# script_params => { | |
# "json_field" => "notes" <--Specify the field you want to extract json from (default: message) | |
# "array" => true <--Do you want to flatten arrays within the json (default: false) | |
# "target" => "parent" <--Specify root level name if wanted (default: root of document) | |
# "tag_match_failure" => true <--Do you want to tag event when json regex match fails (defaults: false) | |
# } | |
# } | |
# the value of `params` is the value of the hash passed to `script_params` | |
# in the logstash configuration | |
def register(params) | |
@json_field = case params["json_field"] when nil then "message"; else params["json_field"]; end | |
@array = case params["array"] when "true" then true; when "false" then false; else false end | |
@target = "[#{params["target"]}]" if !params["target"].nil? | |
@tag_match_failure = case params["tag_match_failure"] when "true" then true; when "false" then false; else false end | |
end | |
# the filter method receives an event and must return a list of events. | |
# Dropping an event means not including it in the return array, | |
# while creating new ones only requires you to add a new instance of | |
# LogStash::Event to the returned array | |
def filter(event) | |
require 'json' | |
# gsub to remove newlines and then leverage match to extract your JSON | |
begin | |
vJSON = event.get(@json_field).gsub(/\n/,'').match(/(\[.*\]|{.*})/)[0] | |
rescue => e | |
case e.message | |
when "undefined method `gsub' for nil:NilClass" then logger.debug("(DT) Your json_field [#{@json_field}] is nil") | |
when "undefined method `[]' for nil:NilClass" | |
logger.debug("(DT) Unable to match json within field [#{@json_field}]"); event.tag("_DTmatch_failure") if @tag_match_failure | |
else | |
logger.info("(DT) Issue extracting json: ",:exception => e.class.name, :message => e.message) | |
event.tag("_DTmatch_failure") if @tag_match_failure | |
end | |
return [event] | |
end | |
begin | |
pJSON = JSON.parse(vJSON) | |
rescue => e | |
logger.debug('(DT) JSON parse failure:', :exception => e.class.name, :event => event, :error => e.backtrace) | |
event.tag("_DTjsonparse_failure") | |
return [event] | |
end | |
# Call deep_traverse by supplying the parsed JSON or valid hash and specify [true||false] depending on if you want all | |
# the arrays within your object to be flattened (defaults to false). They will be given a key corresponding to the arrays | |
# index when iterating through (i.e. 0,1,2,...) | |
deep_traverse(pJSON,@array,event) do |path,value,parseArray| | |
begin | |
# Values from deep_traverse are returned at the yield code. Each time yield returns the variables we need to determine the action taken on them | |
if parseArray | |
# If parseArray is true we check if the value is a discrete element before adding the event, meaning not a Hash or an Array. | |
if !value.is_a?(Hash) && !value.is_a?(Array) | |
key_path = path.map!{|k| "[#{k}]"} | |
#-------------------------------------------------------------------------------- | |
#----this is your chance to manipulate the element prior to adding if needed----- | |
#-------------------------------------------------------------------------------- | |
event.set("#{@target}#{key_path.join()}",value) | |
elsif value.is_a?(Array) | |
# Need to take into consideration arrays that contain discrete elements (i.e. Strings, Integers, Floats, Booleans, etc..) | |
# Check to ensure none of the elements in the array are of Class Array or Hash | |
if !value.all?{|i| ("Array Hash").include?(i.class.to_s)} | |
key_path = path.map!{|k| "[#{k}]"} | |
#-------------------------------------------------------------------------------- | |
#----this is your chance to manipulate the element prior to adding if needed----- | |
#-------------------------------------------------------------------------------- | |
event.set("#{@target}#{key_path.join()}",value) | |
end | |
end | |
else | |
# If parseArray is false we simple add any value that is not a Hash | |
if !value.is_a?(Hash) | |
key_path = path.map!{|k| "[#{k}]"} | |
#-------------------------------------------------------------------------------- | |
#----this is your chance to manipulate the element prior to adding if needed----- | |
#-------------------------------------------------------------------------------- | |
event.set("#{@target}#{key_path.join()}",value) | |
end | |
end | |
rescue => e | |
logger.info("(DT) deep_traverse outer failure: ", :exception => e.class.name, :message => e.message, :event => event, :error => e.backtrace) | |
event.tag("_DTouter_failure") | |
return[event] | |
end | |
end | |
return[event] | |
end | |
def deep_traverse(pJSON,parseArray=false,event,&block) | |
begin | |
# Validate parseArray is a Boolean value | |
raise "Expected Boolean value" if !("TrueClass FalseClass").include?("#{parseArray.class}") | |
# If the top level of the hash is an array and parseArray is true this will loop through | |
# each element and add the index key to create objects. | |
if pJSON.all?{ |i| i.is_a?(Hash)} && parseArray then | |
hitem = {} | |
pJSON.each_with_index{ |vx,idx| hitem[idx.to_s]=vx} | |
pJSON = hitem | |
end | |
# Map the hash to an array | |
stack = pJSON.map{ |k,v| [ [k], v ] } | |
while not stack.empty? | |
# While the array is not empty I am going to loop through removing the last element in | |
# the array each time to do work on. | |
key, value = stack.pop | |
# Return the element I just popped to see if it's a valid key/value to add to the event | |
yield(key, value, parseArray) | |
# Continue work on value if it's a hash and further if the hash contains an array | |
if value.is_a?(Hash) | |
value.each do |k,v| | |
if v.is_a? Array then | |
# If this value of the hash is an array loop through and create objects use the | |
# array index | |
if parseArray then | |
hitem = {} | |
v.each_with_index{|vx,idx| if vx.is_a?(Hash) then hitem[idx.to_s]=vx end} | |
if hitem.empty? then v = nil else v = hitem end | |
end | |
end | |
# Having done work on the nested Hash we add it back to the stack array for further processing | |
stack.push [ key.dup << k, v ] | |
end | |
# This will deal with first level arrays of objects | |
elsif value.is_a?(Array) && parseArray | |
if value.all?{|i| ("Hash").include?(i.class.to_s)} | |
hitem = {} | |
value.each_with_index{|vx,idx| if vx.is_a?(Hash) then hitem[idx.to_s]=vx end} | |
if hitem.empty? then value = nil else value = hitem end | |
stack.push [ key, value] | |
end | |
end | |
end | |
rescue => e | |
logger.info("(DT) deep_traverse inner failure: ", :exception => e.class.name, :message => e.message, :event => event, :error => e.backtrace) | |
event.tag("_DTinner_failure") | |
return[event] | |
end | |
end |
This comment has been minimized.
This comment has been minimized.
Hi cjlyons81, my name is Bryan and i work with ELK, |
This comment has been minimized.
This comment has been minimized.
Hello bryankouwen- How can I help? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
UPDATE 2/2/2019 -- Added code to deal with empty array values such as
{
"plan_name": "default",
"tags": [],
"catalog_id": "59f0f8333310c2960"
}
This would cause the script to loop infinitely since the same value would be popped and then pushed back over and over again.
Added@line124: if hitem.empty? then v = nil else v = hitem end
Added@line135: if hitem.empty? then value = nil else value = hitem end