Skip to content

Instantly share code, notes, and snippets.

@cjlyons81
Last active December 6, 2019 15:17
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cjlyons81/8f47a281934c8e7c040d7bf8985db331 to your computer and use it in GitHub Desktop.
Save cjlyons81/8f47a281934c8e7c040d7bf8985db331 to your computer and use it in GitHub Desktop.
Ruby filter for parsing json and adding to event in a Logstash pipeline
# ruby {
# path => "/etc/logstash/ancillary/ruby-scripts/json-to-event.rb" <--Example path to ruby file, be sure to update with yours
# script_params => {
# "json_field" => "notes" <--Specify the field you want to extract json from (default: message)
# "array" => true <--Do you want to flatten arrays within the json (default: false)
# "target" => "parent" <--Specify root level name if wanted (default: root of document)
# "tag_match_failure" => true <--Do you want to tag event when json regex match fails (defaults: false)
# }
# }
# the value of `params` is the value of the hash passed to `script_params`
# in the logstash configuration
def register(params)
@json_field = case params["json_field"] when nil then "message"; else params["json_field"]; end
@array = case params["array"] when "true" then true; when "false" then false; else false end
@target = "[#{params["target"]}]" if !params["target"].nil?
@tag_match_failure = case params["tag_match_failure"] when "true" then true; when "false" then false; else false end
end
# the filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array,
# while creating new ones only requires you to add a new instance of
# LogStash::Event to the returned array
def filter(event)
require 'json'
# gsub to remove newlines and then leverage match to extract your JSON
begin
vJSON = event.get(@json_field).gsub(/\n/,'').match(/(\[.*\]|{.*})/)[0]
rescue => e
case e.message
when "undefined method `gsub' for nil:NilClass" then logger.debug("(DT) Your json_field [#{@json_field}] is nil")
when "undefined method `[]' for nil:NilClass"
logger.debug("(DT) Unable to match json within field [#{@json_field}]"); event.tag("_DTmatch_failure") if @tag_match_failure
else
logger.info("(DT) Issue extracting json: ",:exception => e.class.name, :message => e.message)
event.tag("_DTmatch_failure") if @tag_match_failure
end
return [event]
end
begin
pJSON = JSON.parse(vJSON)
rescue => e
logger.debug('(DT) JSON parse failure:', :exception => e.class.name, :event => event, :error => e.backtrace)
event.tag("_DTjsonparse_failure")
return [event]
end
# Call deep_traverse by supplying the parsed JSON or valid hash and specify [true||false] depending on if you want all
# the arrays within your object to be flattened (defaults to false). They will be given a key corresponding to the arrays
# index when iterating through (i.e. 0,1,2,...)
deep_traverse(pJSON,@array,event) do |path,value,parseArray|
begin
# Values from deep_traverse are returned at the yield code. Each time yield returns the variables we need to determine the action taken on them
if parseArray
# If parseArray is true we check if the value is a discrete element before adding the event, meaning not a Hash or an Array.
if !value.is_a?(Hash) && !value.is_a?(Array)
key_path = path.map!{|k| "[#{k}]"}
#--------------------------------------------------------------------------------
#----this is your chance to manipulate the element prior to adding if needed-----
#--------------------------------------------------------------------------------
event.set("#{@target}#{key_path.join()}",value)
elsif value.is_a?(Array)
# Need to take into consideration arrays that contain discrete elements (i.e. Strings, Integers, Floats, Booleans, etc..)
# Check to ensure none of the elements in the array are of Class Array or Hash
if !value.all?{|i| ("Array Hash").include?(i.class.to_s)}
key_path = path.map!{|k| "[#{k}]"}
#--------------------------------------------------------------------------------
#----this is your chance to manipulate the element prior to adding if needed-----
#--------------------------------------------------------------------------------
event.set("#{@target}#{key_path.join()}",value)
end
end
else
# If parseArray is false we simple add any value that is not a Hash
if !value.is_a?(Hash)
key_path = path.map!{|k| "[#{k}]"}
#--------------------------------------------------------------------------------
#----this is your chance to manipulate the element prior to adding if needed-----
#--------------------------------------------------------------------------------
event.set("#{@target}#{key_path.join()}",value)
end
end
rescue => e
logger.info("(DT) deep_traverse outer failure: ", :exception => e.class.name, :message => e.message, :event => event, :error => e.backtrace)
event.tag("_DTouter_failure")
return[event]
end
end
return[event]
end
def deep_traverse(pJSON,parseArray=false,event,&block)
begin
# Validate parseArray is a Boolean value
raise "Expected Boolean value" if !("TrueClass FalseClass").include?("#{parseArray.class}")
# If the top level of the hash is an array and parseArray is true this will loop through
# each element and add the index key to create objects.
if pJSON.all?{ |i| i.is_a?(Hash)} && parseArray then
hitem = {}
pJSON.each_with_index{ |vx,idx| hitem[idx.to_s]=vx}
pJSON = hitem
end
# Map the hash to an array
stack = pJSON.map{ |k,v| [ [k], v ] }
while not stack.empty?
# While the array is not empty I am going to loop through removing the last element in
# the array each time to do work on.
key, value = stack.pop
# Return the element I just popped to see if it's a valid key/value to add to the event
yield(key, value, parseArray)
# Continue work on value if it's a hash and further if the hash contains an array
if value.is_a?(Hash)
value.each do |k,v|
if v.is_a? Array then
# If this value of the hash is an array loop through and create objects use the
# array index
if parseArray then
hitem = {}
v.each_with_index{|vx,idx| if vx.is_a?(Hash) then hitem[idx.to_s]=vx end}
if hitem.empty? then v = nil else v = hitem end
end
end
# Having done work on the nested Hash we add it back to the stack array for further processing
stack.push [ key.dup << k, v ]
end
# This will deal with first level arrays of objects
elsif value.is_a?(Array) && parseArray
if value.all?{|i| ("Hash").include?(i.class.to_s)}
hitem = {}
value.each_with_index{|vx,idx| if vx.is_a?(Hash) then hitem[idx.to_s]=vx end}
if hitem.empty? then value = nil else value = hitem end
stack.push [ key, value]
end
end
end
rescue => e
logger.info("(DT) deep_traverse inner failure: ", :exception => e.class.name, :message => e.message, :event => event, :error => e.backtrace)
event.tag("_DTinner_failure")
return[event]
end
end
@cjlyons81
Copy link
Author

UPDATE 2/2/2019 -- Added code to deal with empty array values such as
{
"plan_name": "default",
"tags": [],
"catalog_id": "59f0f8333310c2960"
}
This would cause the script to loop infinitely since the same value would be popped and then pushed back over and over again.
Added@line124: if hitem.empty? then v = nil else v = hitem end
Added@line135: if hitem.empty? then value = nil else value = hitem end

@bryankouwen
Copy link

Hi cjlyons81, my name is Bryan and i work with ELK,
Can you help me? Because I have a log in json that I need to extract the nested fields, and could not use your scprit. I appreciate the help.

@cjlyons81
Copy link
Author

Hello bryankouwen- How can I help?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment