Skip to content

Instantly share code, notes, and snippets.

@daniilyar
Last active July 15, 2016 07:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daniilyar/758857675856ee2fdbef to your computer and use it in GitHub Desktop.
Save daniilyar/758857675856ee2fdbef to your computer and use it in GitHub Desktop.
Logstash GELF input with TCP support added
# encoding: utf-8
require "date"
require "logstash/inputs/base"
require "logstash/namespace"
require "socket"
# This input will read GELF messages as events over the network,
# making it a good choice if you already use Graylog2 today.
#
# The main use case for this input is to leverage existing GELF
# logging libraries such as the GELF log4j appender.
#
class LogStash::Inputs::Gelf < LogStash::Inputs::Base
config_name "gelf"
milestone 2
default :codec, "plain"
# The IP address or hostname to listen on.
config :host, :validate => :string, :default => "0.0.0.0"
# The port to listen on. Remember that ports less than 1024 (privileged
# ports) may require root to use.
config :port, :validate => :number, :default => 12201
# Whether or not to remap the GELF message fields to Logstash event fields or
# leave them intact.
#
# Remapping converts the following GELF fields to Logstash equivalents:
#
# * `full\_message` becomes event["message"].
# * if there is no `full\_message`, `short\_message` becomes event["message"].
config :remap, :validate => :boolean, :default => true
# Whether or not to remove the leading '\_' in GELF fields or leave them
# in place. (Logstash < 1.2 did not remove them by default.). Note that
# GELF version 1.1 format now requires all non-standard fields to be added
# as an "additional" field, beginning with an underscore.
#
# e.g. `\_foo` becomes `foo`
#
config :strip_leading_underscore, :validate => :boolean, :default => true
# Whether or not to use TCP, instead of UDP
config :use_tcp, :validate => :boolean, :default => false
# Whether to capture the hostname or numeric address of the incoming connection
# Defaults to hostname
config :use_numeric_client_addr, :validate => :boolean, :default => false
public
def initialize(params)
super
BasicSocket.do_not_reverse_lookup = true
end # def initialize
public
def register
require 'gelfd'
@tcp = nil
@udp = nil
end # def register
public
def run(output_queue)
begin
# udp server
if @use_tcp
tcp_listener(output_queue)
else
udp_listener(output_queue)
end
rescue => e
@logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace)
sleep(5)
retry
end # begin
end # def run
###########################################################
# Updated tcp_listener that properly breaks on \000
###########################################################
private
def tcp_listener(output_queue)
@logger.warn("Starting gelf listener (tcp) ...", :address => "#{@host}:#{@port}")
if @tcp.nil?
@tcp = TCPServer.new(@host, @port)
end
while !@shutdown_requested
Thread.new(@tcp.accept) do |client|
@logger.debug? && @logger.debug("Gelf (tcp): Accepting connection from: #{client.peeraddr[2]}:#{client.peeraddr[1]}")
begin
while !client.nil? && !client.eof?
begin # Read from socket
@data_in = client.gets("\u0000")
rescue => ex
@logger.warn("Gelf (tcp): failed gets from client socket:", :exception => ex, :backtrace => ex.backtrace)
end
if @data_in.nil?
@logger.warn("Gelf (tcp): socket read succeeded, but data is nil. Skipping.")
next
end
# data received. Remove trailing \0
@data_in[-1] == "\u0000" && @data_in = @data_in[0...-1]
begin # Parse JSON
@jsonObj = JSON.parse(@data_in)
rescue => ex
@logger.warn("Gelf (tcp): failed to parse a message. Skipping: " + @data_in, :exception => ex, :backtrace => ex.backtrace)
next
end
begin # Create event
event = LogStash::Event.new(@jsonObj)
event["source_host"] = @use_numeric_client_addr && client.addr(:numeric) || client.addr(:hostname)
if event["timestamp"].is_a?(Numeric)
event["@timestamp"] = Time.at(event["timestamp"]).gmtime
event.remove("timestamp")
end
remap_gelf(event) if @remap
strip_leading_underscore(event) if @strip_leading_underscore
decorate(event)
output_queue << event
rescue => ex
@logger.warn("Gelf (tcp): failed to create event from json object. Skipping: " + @jsonObj.to_s, :exception => ex, :backtrace => ex.backtrace)
end
end # while client
@logger.debug? && @logger.debug("Gelf (tcp): Closing client connection")
client.close
client = nil
rescue => ex
@logger.warn("Gelf (tcp): client socket failed.", :exception => ex, :backtrace => ex.backtrace)
ensure
if !client.nil?
@logger.debug? && @logger.debug("Gelf (tcp): Ensuring client is closed")
client.close
client = nil
end
end # begin client
end # Thread.new
end # @shutdown_requested
end
###########################################################
private
def tcp_listener2(output_queue)
@logger.info("Starting gelf listener (tcp) ...", :address => "#{@host}:#{@port}")
if @tcp.nil?
@tcp = TCPServer.new(@host, @port)
end
while !@shutdown_requested
Thread.start(@tcp.accept) do |client|
begin
data_in = client.gets
#data = '{"version": "1.1","host":"example.org","short_message":"Short message","full_message":"Backtrace here is more stuff","level":1,"_user_id":9001,"_some_info":"foo","_some_env_var":"bar"}'
@logger.warn("Gelf (tcp): raw line: \n" + data_in)
rescue => ex
@logger.warn("Gelf (tcp): failed gets from client socket:", :exception => ex, :backtrace => ex.backtrace)
end
if data_in.nil?
@logger.warn("Gelf (tcp): socket read succeeded, but data is nil. Skipping.")
next
end
begin
data_json = data_in.delete("\000")
@logger.warn("Gelf (tcp): data without \000 symbol: " + data_json)
# Turn the data set into an array. Ensure there are commas between the objects.
data_json = arrayify(data_json) # Ensure legal json
if data_json.slice(0) != '['
@logger.warn("Gelf (tcp): failed to arrayify data. Skipping ...")
next
end
@logger.warn("Gelf (tcp): Parsing arrayified data: " + data_json)
@jsonArray = JSON.parse(data_json)
@logger.warn("Gelf (tcp): parsing successful")
next if @jsonArray.nil?
rescue => ex
@logger.warn("Gelf (tcp): failed to parse a message. Skipping", :exception => ex, :backtrace => ex.backtrace)
next
end
# We now have an array of events. Process them individually into log messages.
@jsonArray.each do |jsonObj|
begin
event = LogStash::Event.new(jsonObj)
event["source_host"] = @use_numeric_client_addr && client.addr(:numeric) || client.addr(:hostname)
if event["timestamp"].is_a?(Numeric)
event["@timestamp"] = Time.at(event["timestamp"]).gmtime
event.remove("timestamp")
end
remap_gelf(event) if @remap
strip_leading_underscore(event) if @strip_leading_underscore
decorate(event)
output_queue << event
rescue => ex
@logger.warn("Gelf (tcp): failed to create event. Skipping: " + jsonObj.to_s, :exception => ex, :backtrace => ex.backtrace)
end
end # event.each
end # thread
end # shutdown loop
end # tcp_listener
private
def udp_listener(output_queue)
@logger.info("Starting gelf listener (udp)", :address => "#{@host}:#{@port}")
if @udp
@udp.close_read rescue nil
@udp.close_write rescue nil
end
@udp = UDPSocket.new(Socket::AF_INET)
@udp.bind(@host, @port)
while true
line, client = @udp.recvfrom(8192)
begin
data = Gelfd::Parser.parse(line)
rescue => ex
@logger.warn("Gelfd failed to parse a message skipping", :exception => ex, :backtrace => ex.backtrace)
next
end
# Gelfd parser outputs null if it received and cached a non-final chunk
next if data.nil?
event = LogStash::Event.new(JSON.parse(data))
event["source_host"] = client[3]
if event["timestamp"].is_a?(Numeric)
event["@timestamp"] = Time.at(event["timestamp"]).gmtime
event.remove("timestamp")
end
remap_gelf(event) if @remap
strip_leading_underscore(event) if @strip_leading_underscore
decorate(event)
output_queue << event
end
rescue LogStash::ShutdownSignal
# Do nothing, shutdown.
ensure
if @udp
@udp.close_read rescue nil
@udp.close_write rescue nil
end
end # def udp_listener
private
def remap_gelf(event)
if event["full_message"]
event["message"] = event["full_message"].dup
event.remove("full_message")
if event["short_message"] == event["message"]
event.remove("short_message")
end
elsif event["short_message"]
event["message"] = event["short_message"].dup
event.remove("short_message")
end
end # def remap_gelf
private
def strip_leading_underscore(event)
# Map all '_foo' fields to simply 'foo'
event.to_hash.keys.each do |key|
next unless key[0,1] == "_"
event[key[1..-1]] = event[key]
event.remove(key)
end
end # deef removing_leading_underscores
# if the data contains a set of raw json objects (like what Icinga sends)
# then add array symbols, and separate objects with commas
private
def arrayify(data)
d1 = data.gsub('}{',' },{')
d2 = d1.slice(0) == '[' || '[' + d1
d3 = d2.slice(-1) ==']' || d2 << ']'
if d3.slice(0) != '['
@logger.warn("Gelf (tcp) arrayify: Data is not correctly modified! Missing '['.")
end
if d3.slice(-1) != ']'
@logger.warn("Gelf (tcp) arrayify: Data is not correctly modified! Missing ']'.")
end
if /\}\{/ =~ d3
@logger.warn("Gelf (tcp) arrayify: Data is not correctly modified! Containes '}{'.")
end
d3
end
end # class LogStash::Inputs::Gelf
@daniilyar
Copy link
Author

Howto:

  • Enable Icinga GELF feature:
    icinga2 feature enable gelf && service icinga2 restart

  • Update Icinga GELG plugin config at /etc/icinga2/features-available/gelf.conf

  • Restart Icinga again: service icinga2 restart

  • Add GELF input to Logstash config:

    input {

    ...

    gelf {
    port => 12201
    use_tcp => true
    type => "icinga-monitoring"
    tags => [ 'icinga-monitoring', 'gelf' ]
    }

    ...
    }

(urgent part here is 'use_tcp => true')

  • Check if log file /var/log/icinga2/icinga2.log contains errors like below and update firewall rules needed:

[2015-07-25 06:13:19 +0000] critical/GelfWriter: Can't connect to GELF endpoint '10.123.34.41' port '12201'.

@ScottJEhas
Copy link

Had to adjust a single line of code for Elasticsearch 2.0, and put a require on top of this code. Thanks, this helped me out a bunch.

require "json"

if event["timestamp"].is_a?(Numeric)
event.timestamp = LogStash::Timestamp.at(event["timestamp"])
#event["@timestamp"] = Time.at(event["timestamp"]).gmtime
event.remove("timestamp")
end

@vraton
Copy link

vraton commented Jul 15, 2016

Hi daniilyar
I´m trying to setup the icinga2 + logstash monitoring.
I have configured all the required in icinga and i have added the gelf input.
where do i have to add the code that you have written to allow the "use_tcp" as gelf input?

thanks a lot!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment