Last active
December 6, 2019 15:17
-
-
Save cjlyons81/8f47a281934c8e7c040d7bf8985db331 to your computer and use it in GitHub Desktop.
Ruby filter for parsing json and adding to event in a Logstash pipeline
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ruby { | |
# path => "/etc/logstash/ancillary/ruby-scripts/json-to-event.rb" <--Example path to ruby file, be sure to update with yours | |
# script_params => { | |
# "json_field" => "notes" <--Specify the field you want to extract json from (default: message) | |
# "array" => true <--Do you want to flatten arrays within the json (default: false) | |
# "target" => "parent" <--Specify root level name if wanted (default: root of document) | |
# "tag_match_failure" => true <--Do you want to tag event when json regex match fails (defaults: false) | |
# } | |
# } | |
# the value of `params` is the value of the hash passed to `script_params` | |
# in the logstash configuration | |
def register(params) | |
@json_field = case params["json_field"] when nil then "message"; else params["json_field"]; end | |
@array = case params["array"] when "true" then true; when "false" then false; else false end | |
@target = "[#{params["target"]}]" if !params["target"].nil? | |
@tag_match_failure = case params["tag_match_failure"] when "true" then true; when "false" then false; else false end | |
end | |
# the filter method receives an event and must return a list of events. | |
# Dropping an event means not including it in the return array, | |
# while creating new ones only requires you to add a new instance of | |
# LogStash::Event to the returned array | |
def filter(event) | |
require 'json' | |
# gsub to remove newlines and then leverage match to extract your JSON | |
begin | |
vJSON = event.get(@json_field).gsub(/\n/,'').match(/(\[.*\]|{.*})/)[0] | |
rescue => e | |
case e.message | |
when "undefined method `gsub' for nil:NilClass" then logger.debug("(DT) Your json_field [#{@json_field}] is nil") | |
when "undefined method `[]' for nil:NilClass" | |
logger.debug("(DT) Unable to match json within field [#{@json_field}]"); event.tag("_DTmatch_failure") if @tag_match_failure | |
else | |
logger.info("(DT) Issue extracting json: ",:exception => e.class.name, :message => e.message) | |
event.tag("_DTmatch_failure") if @tag_match_failure | |
end | |
return [event] | |
end | |
begin | |
pJSON = JSON.parse(vJSON) | |
rescue => e | |
logger.debug('(DT) JSON parse failure:', :exception => e.class.name, :event => event, :error => e.backtrace) | |
event.tag("_DTjsonparse_failure") | |
return [event] | |
end | |
# Call deep_traverse by supplying the parsed JSON or valid hash and specify [true||false] depending on if you want all | |
# the arrays within your object to be flattened (defaults to false). They will be given a key corresponding to the arrays | |
# index when iterating through (i.e. 0,1,2,...) | |
deep_traverse(pJSON,@array,event) do |path,value,parseArray| | |
begin | |
# Values from deep_traverse are returned at the yield code. Each time yield returns the variables we need to determine the action taken on them | |
if parseArray | |
# If parseArray is true we check if the value is a discrete element before adding the event, meaning not a Hash or an Array. | |
if !value.is_a?(Hash) && !value.is_a?(Array) | |
key_path = path.map!{|k| "[#{k}]"} | |
#-------------------------------------------------------------------------------- | |
#----this is your chance to manipulate the element prior to adding if needed----- | |
#-------------------------------------------------------------------------------- | |
event.set("#{@target}#{key_path.join()}",value) | |
elsif value.is_a?(Array) | |
# Need to take into consideration arrays that contain discrete elements (i.e. Strings, Integers, Floats, Booleans, etc..) | |
# Check to ensure none of the elements in the array are of Class Array or Hash | |
if !value.all?{|i| ("Array Hash").include?(i.class.to_s)} | |
key_path = path.map!{|k| "[#{k}]"} | |
#-------------------------------------------------------------------------------- | |
#----this is your chance to manipulate the element prior to adding if needed----- | |
#-------------------------------------------------------------------------------- | |
event.set("#{@target}#{key_path.join()}",value) | |
end | |
end | |
else | |
# If parseArray is false we simple add any value that is not a Hash | |
if !value.is_a?(Hash) | |
key_path = path.map!{|k| "[#{k}]"} | |
#-------------------------------------------------------------------------------- | |
#----this is your chance to manipulate the element prior to adding if needed----- | |
#-------------------------------------------------------------------------------- | |
event.set("#{@target}#{key_path.join()}",value) | |
end | |
end | |
rescue => e | |
logger.info("(DT) deep_traverse outer failure: ", :exception => e.class.name, :message => e.message, :event => event, :error => e.backtrace) | |
event.tag("_DTouter_failure") | |
return[event] | |
end | |
end | |
return[event] | |
end | |
def deep_traverse(pJSON,parseArray=false,event,&block) | |
begin | |
# Validate parseArray is a Boolean value | |
raise "Expected Boolean value" if !("TrueClass FalseClass").include?("#{parseArray.class}") | |
# If the top level of the hash is an array and parseArray is true this will loop through | |
# each element and add the index key to create objects. | |
if pJSON.all?{ |i| i.is_a?(Hash)} && parseArray then | |
hitem = {} | |
pJSON.each_with_index{ |vx,idx| hitem[idx.to_s]=vx} | |
pJSON = hitem | |
end | |
# Map the hash to an array | |
stack = pJSON.map{ |k,v| [ [k], v ] } | |
while not stack.empty? | |
# While the array is not empty I am going to loop through removing the last element in | |
# the array each time to do work on. | |
key, value = stack.pop | |
# Return the element I just popped to see if it's a valid key/value to add to the event | |
yield(key, value, parseArray) | |
# Continue work on value if it's a hash and further if the hash contains an array | |
if value.is_a?(Hash) | |
value.each do |k,v| | |
if v.is_a? Array then | |
# If this value of the hash is an array loop through and create objects use the | |
# array index | |
if parseArray then | |
hitem = {} | |
v.each_with_index{|vx,idx| if vx.is_a?(Hash) then hitem[idx.to_s]=vx end} | |
if hitem.empty? then v = nil else v = hitem end | |
end | |
end | |
# Having done work on the nested Hash we add it back to the stack array for further processing | |
stack.push [ key.dup << k, v ] | |
end | |
# This will deal with first level arrays of objects | |
elsif value.is_a?(Array) && parseArray | |
if value.all?{|i| ("Hash").include?(i.class.to_s)} | |
hitem = {} | |
value.each_with_index{|vx,idx| if vx.is_a?(Hash) then hitem[idx.to_s]=vx end} | |
if hitem.empty? then value = nil else value = hitem end | |
stack.push [ key, value] | |
end | |
end | |
end | |
rescue => e | |
logger.info("(DT) deep_traverse inner failure: ", :exception => e.class.name, :message => e.message, :event => event, :error => e.backtrace) | |
event.tag("_DTinner_failure") | |
return[event] | |
end | |
end |
Hi cjlyons81, my name is Bryan and i work with ELK,
Can you help me? Because I have a log in json that I need to extract the nested fields, and could not use your scprit. I appreciate the help.
Hello bryankouwen- How can I help?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
UPDATE 2/2/2019 -- Added code to deal with empty array values such as
{
"plan_name": "default",
"tags": [],
"catalog_id": "59f0f8333310c2960"
}
This would cause the script to loop infinitely since the same value would be popped and then pushed back over and over again.
Added@line124: if hitem.empty? then v = nil else v = hitem end
Added@line135: if hitem.empty? then value = nil else value = hitem end