Last active
July 15, 2016 07:14
-
-
Save daniilyar/758857675856ee2fdbef to your computer and use it in GitHub Desktop.
Logstash GELF input with TCP support added
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
require "date" | |
require "logstash/inputs/base" | |
require "logstash/namespace" | |
require "socket" | |
# This input will read GELF messages as events over the network, | |
# making it a good choice if you already use Graylog2 today. | |
# | |
# The main use case for this input is to leverage existing GELF | |
# logging libraries such as the GELF log4j appender. | |
# | |
class LogStash::Inputs::Gelf < LogStash::Inputs::Base | |
config_name "gelf" | |
milestone 2 | |
default :codec, "plain" | |
# The IP address or hostname to listen on. | |
config :host, :validate => :string, :default => "0.0.0.0" | |
# The port to listen on. Remember that ports less than 1024 (privileged | |
# ports) may require root to use. | |
config :port, :validate => :number, :default => 12201 | |
# Whether or not to remap the GELF message fields to Logstash event fields or | |
# leave them intact. | |
# | |
# Remapping converts the following GELF fields to Logstash equivalents: | |
# | |
# * `full\_message` becomes event["message"]. | |
# * if there is no `full\_message`, `short\_message` becomes event["message"]. | |
config :remap, :validate => :boolean, :default => true | |
# Whether or not to remove the leading '\_' in GELF fields or leave them | |
# in place. (Logstash < 1.2 did not remove them by default.). Note that | |
# GELF version 1.1 format now requires all non-standard fields to be added | |
# as an "additional" field, beginning with an underscore. | |
# | |
# e.g. `\_foo` becomes `foo` | |
# | |
config :strip_leading_underscore, :validate => :boolean, :default => true | |
# Whether or not to use TCP, instead of UDP | |
config :use_tcp, :validate => :boolean, :default => false | |
# Whether to capture the hostname or numeric address of the incoming connection | |
# Defaults to hostname | |
config :use_numeric_client_addr, :validate => :boolean, :default => false | |
public | |
def initialize(params) | |
super | |
BasicSocket.do_not_reverse_lookup = true | |
end # def initialize | |
public | |
def register | |
require 'gelfd' | |
@tcp = nil | |
@udp = nil | |
end # def register | |
public | |
def run(output_queue) | |
begin | |
# udp server | |
if @use_tcp | |
tcp_listener(output_queue) | |
else | |
udp_listener(output_queue) | |
end | |
rescue => e | |
@logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace) | |
sleep(5) | |
retry | |
end # begin | |
end # def run | |
########################################################### | |
# Updated tcp_listener that properly breaks on \000 | |
########################################################### | |
private | |
def tcp_listener(output_queue) | |
@logger.warn("Starting gelf listener (tcp) ...", :address => "#{@host}:#{@port}") | |
if @tcp.nil? | |
@tcp = TCPServer.new(@host, @port) | |
end | |
while !@shutdown_requested | |
Thread.new(@tcp.accept) do |client| | |
@logger.debug? && @logger.debug("Gelf (tcp): Accepting connection from: #{client.peeraddr[2]}:#{client.peeraddr[1]}") | |
begin | |
while !client.nil? && !client.eof? | |
begin # Read from socket | |
@data_in = client.gets("\u0000") | |
rescue => ex | |
@logger.warn("Gelf (tcp): failed gets from client socket:", :exception => ex, :backtrace => ex.backtrace) | |
end | |
if @data_in.nil? | |
@logger.warn("Gelf (tcp): socket read succeeded, but data is nil. Skipping.") | |
next | |
end | |
# data received. Remove trailing \0 | |
@data_in[-1] == "\u0000" && @data_in = @data_in[0...-1] | |
begin # Parse JSON | |
@jsonObj = JSON.parse(@data_in) | |
rescue => ex | |
@logger.warn("Gelf (tcp): failed to parse a message. Skipping: " + @data_in, :exception => ex, :backtrace => ex.backtrace) | |
next | |
end | |
begin # Create event | |
event = LogStash::Event.new(@jsonObj) | |
event["source_host"] = @use_numeric_client_addr && client.addr(:numeric) || client.addr(:hostname) | |
if event["timestamp"].is_a?(Numeric) | |
event["@timestamp"] = Time.at(event["timestamp"]).gmtime | |
event.remove("timestamp") | |
end | |
remap_gelf(event) if @remap | |
strip_leading_underscore(event) if @strip_leading_underscore | |
decorate(event) | |
output_queue << event | |
rescue => ex | |
@logger.warn("Gelf (tcp): failed to create event from json object. Skipping: " + @jsonObj.to_s, :exception => ex, :backtrace => ex.backtrace) | |
end | |
end # while client | |
@logger.debug? && @logger.debug("Gelf (tcp): Closing client connection") | |
client.close | |
client = nil | |
rescue => ex | |
@logger.warn("Gelf (tcp): client socket failed.", :exception => ex, :backtrace => ex.backtrace) | |
ensure | |
if !client.nil? | |
@logger.debug? && @logger.debug("Gelf (tcp): Ensuring client is closed") | |
client.close | |
client = nil | |
end | |
end # begin client | |
end # Thread.new | |
end # @shutdown_requested | |
end | |
########################################################### | |
private | |
def tcp_listener2(output_queue) | |
@logger.info("Starting gelf listener (tcp) ...", :address => "#{@host}:#{@port}") | |
if @tcp.nil? | |
@tcp = TCPServer.new(@host, @port) | |
end | |
while !@shutdown_requested | |
Thread.start(@tcp.accept) do |client| | |
begin | |
data_in = client.gets | |
#data = '{"version": "1.1","host":"example.org","short_message":"Short message","full_message":"Backtrace here is more stuff","level":1,"_user_id":9001,"_some_info":"foo","_some_env_var":"bar"}' | |
@logger.warn("Gelf (tcp): raw line: \n" + data_in) | |
rescue => ex | |
@logger.warn("Gelf (tcp): failed gets from client socket:", :exception => ex, :backtrace => ex.backtrace) | |
end | |
if data_in.nil? | |
@logger.warn("Gelf (tcp): socket read succeeded, but data is nil. Skipping.") | |
next | |
end | |
begin | |
data_json = data_in.delete("\000") | |
@logger.warn("Gelf (tcp): data without \000 symbol: " + data_json) | |
# Turn the data set into an array. Ensure there are commas between the objects. | |
data_json = arrayify(data_json) # Ensure legal json | |
if data_json.slice(0) != '[' | |
@logger.warn("Gelf (tcp): failed to arrayify data. Skipping ...") | |
next | |
end | |
@logger.warn("Gelf (tcp): Parsing arrayified data: " + data_json) | |
@jsonArray = JSON.parse(data_json) | |
@logger.warn("Gelf (tcp): parsing successful") | |
next if @jsonArray.nil? | |
rescue => ex | |
@logger.warn("Gelf (tcp): failed to parse a message. Skipping", :exception => ex, :backtrace => ex.backtrace) | |
next | |
end | |
# We now have an array of events. Process them individually into log messages. | |
@jsonArray.each do |jsonObj| | |
begin | |
event = LogStash::Event.new(jsonObj) | |
event["source_host"] = @use_numeric_client_addr && client.addr(:numeric) || client.addr(:hostname) | |
if event["timestamp"].is_a?(Numeric) | |
event["@timestamp"] = Time.at(event["timestamp"]).gmtime | |
event.remove("timestamp") | |
end | |
remap_gelf(event) if @remap | |
strip_leading_underscore(event) if @strip_leading_underscore | |
decorate(event) | |
output_queue << event | |
rescue => ex | |
@logger.warn("Gelf (tcp): failed to create event. Skipping: " + jsonObj.to_s, :exception => ex, :backtrace => ex.backtrace) | |
end | |
end # event.each | |
end # thread | |
end # shutdown loop | |
end # tcp_listener | |
private | |
def udp_listener(output_queue) | |
@logger.info("Starting gelf listener (udp)", :address => "#{@host}:#{@port}") | |
if @udp | |
@udp.close_read rescue nil | |
@udp.close_write rescue nil | |
end | |
@udp = UDPSocket.new(Socket::AF_INET) | |
@udp.bind(@host, @port) | |
while true | |
line, client = @udp.recvfrom(8192) | |
begin | |
data = Gelfd::Parser.parse(line) | |
rescue => ex | |
@logger.warn("Gelfd failed to parse a message skipping", :exception => ex, :backtrace => ex.backtrace) | |
next | |
end | |
# Gelfd parser outputs null if it received and cached a non-final chunk | |
next if data.nil? | |
event = LogStash::Event.new(JSON.parse(data)) | |
event["source_host"] = client[3] | |
if event["timestamp"].is_a?(Numeric) | |
event["@timestamp"] = Time.at(event["timestamp"]).gmtime | |
event.remove("timestamp") | |
end | |
remap_gelf(event) if @remap | |
strip_leading_underscore(event) if @strip_leading_underscore | |
decorate(event) | |
output_queue << event | |
end | |
rescue LogStash::ShutdownSignal | |
# Do nothing, shutdown. | |
ensure | |
if @udp | |
@udp.close_read rescue nil | |
@udp.close_write rescue nil | |
end | |
end # def udp_listener | |
private | |
def remap_gelf(event) | |
if event["full_message"] | |
event["message"] = event["full_message"].dup | |
event.remove("full_message") | |
if event["short_message"] == event["message"] | |
event.remove("short_message") | |
end | |
elsif event["short_message"] | |
event["message"] = event["short_message"].dup | |
event.remove("short_message") | |
end | |
end # def remap_gelf | |
private | |
def strip_leading_underscore(event) | |
# Map all '_foo' fields to simply 'foo' | |
event.to_hash.keys.each do |key| | |
next unless key[0,1] == "_" | |
event[key[1..-1]] = event[key] | |
event.remove(key) | |
end | |
end # deef removing_leading_underscores | |
# if the data contains a set of raw json objects (like what Icinga sends) | |
# then add array symbols, and separate objects with commas | |
private | |
def arrayify(data) | |
d1 = data.gsub('}{',' },{') | |
d2 = d1.slice(0) == '[' || '[' + d1 | |
d3 = d2.slice(-1) ==']' || d2 << ']' | |
if d3.slice(0) != '[' | |
@logger.warn("Gelf (tcp) arrayify: Data is not correctly modified! Missing '['.") | |
end | |
if d3.slice(-1) != ']' | |
@logger.warn("Gelf (tcp) arrayify: Data is not correctly modified! Missing ']'.") | |
end | |
if /\}\{/ =~ d3 | |
@logger.warn("Gelf (tcp) arrayify: Data is not correctly modified! Containes '}{'.") | |
end | |
d3 | |
end | |
end # class LogStash::Inputs::Gelf |
Had to adjust a single line of code for Elasticsearch 2.0, and put a require on top of this code. Thanks, this helped me out a bunch.
require "json"
if event["timestamp"].is_a?(Numeric)
event.timestamp = LogStash::Timestamp.at(event["timestamp"])
#event["@timestamp"] = Time.at(event["timestamp"]).gmtime
event.remove("timestamp")
end
Hi daniilyar
I´m trying to setup the icinga2 + logstash monitoring.
I have configured all the required in icinga and i have added the gelf input.
where do i have to add the code that you have written to allow the "use_tcp" as gelf input?
thanks a lot!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Howto:
Enable Icinga GELF feature:
icinga2 feature enable gelf && service icinga2 restart
Update Icinga GELG plugin config at /etc/icinga2/features-available/gelf.conf
Restart Icinga again: service icinga2 restart
Add GELF input to Logstash config:
input {
...
gelf {
port => 12201
use_tcp => true
type => "icinga-monitoring"
tags => [ 'icinga-monitoring', 'gelf' ]
}
...
}
(urgent part here is 'use_tcp => true')
[2015-07-25 06:13:19 +0000] critical/GelfWriter: Can't connect to GELF endpoint '10.123.34.41' port '12201'.