Created
August 15, 2014 10:05
-
-
Save robinsmidsrod/019ef6d415dbc3f9e0f1 to your computer and use it in GitHub Desktop.
Logstash config related to bug https://github.com/elasticsearch/logstash/issues/1637
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
input { | |
udp { | |
type => "syslog" | |
host => "10.11.12.1" | |
port => 3514 | |
} | |
udp { | |
type => "portfolio" | |
host => "10.11.12.1" | |
port => 3516 | |
codec => "json" | |
} | |
tcp { | |
type => "json-logger" | |
host => "10.11.12.1" | |
port => 3517 | |
codec => "json" | |
} | |
# zeromq { | |
# type => "beaver" | |
# codec => "json" | |
# topology => "pushpull" | |
# address => [ "tcp://10.11.12.1:3518" ] | |
# # See https://github.com/elasticsearch/logstash/issues/1320 for details on the RCVTIMEO flag | |
# sockopt => { "ZMQ::HWM" => 50 | |
# "ZMQ::RCVTIMEO" => 10000 } | |
# } | |
udp { | |
type => "beaver" | |
codec => "json" | |
host => "10.11.12.1" | |
port => 3518 | |
} | |
} | |
filter { | |
#### SYSLOG | |
if "syslog" in [type] { | |
# Parse syslog messages | |
grok { | |
match => [ "message", "<%{POSINT:syslog_pri}>(?:%{SYSLOGTIMESTAMP:syslog_timestamp}|%{TIMESTAMP_ISO8601:syslog_timestamp8601}) (?:%{SYSLOGHOST:syslog_hostname})? %{PROG:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" ] | |
add_tag => [ "grokked" ] | |
} | |
if "grokked" in [tags] { | |
# Convert syslog priority number into facility and severity name/number | |
syslog_pri {} | |
# Parse timestamp | |
if [syslog_timestamp8601] { | |
# Parse proper ISO8601 syslog timestamps | |
date { | |
match => [ "syslog_timestamp8601", "ISO8601" ] # RSYSLOG_ForwardFormat | |
} | |
} | |
else { | |
if [syslog_timestamp] { | |
# Parse traditional syslog timestamps | |
date { | |
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] | |
} | |
} | |
} | |
# If has a hostname, replace host | |
if [syslog_hostname] { | |
mutate { | |
replace => [ "host", "%{syslog_hostname}" ] | |
} | |
} | |
# Replace message with syslog_message (if message is available) | |
if [syslog_message] { | |
mutate { | |
replace => [ "message", "%{syslog_message}" ] | |
} | |
} | |
else { | |
mutate { | |
replace => [ "message", "(empty message)" ] | |
} | |
} | |
# Rename some fields | |
mutate { | |
rename => [ "syslog_program", "program" ] | |
rename => [ "syslog_pid", "pid" ] | |
rename => [ "syslog_severity", "severity" ] | |
rename => [ "syslog_facility", "category" ] | |
} | |
# Clean up redundant parameters | |
mutate { | |
convert => [ "syslog_facility_code", "integer" ] | |
convert => [ "syslog_severity_code", "integer" ] | |
convert => [ "pid", "integer" ] | |
remove_field => [ "syslog_timestamp", "syslog_timestamp8601", | |
"syslog_hostname", "syslog_message", "syslog_pri" ] | |
remove_tag => [ "grokked" ] | |
} | |
} | |
# Clean up postfix syslog messages | |
if [program] =~ /^postfix\// { | |
# Convert combined postfix program name into separate parts | |
grok { | |
match => [ "program", "^postfix/%{GREEDYDATA:program_worker}" ] | |
add_tag => [ "grokked" ] | |
} | |
if "grokked" in [tags] { | |
# Set program to "postfix" | |
mutate { | |
replace => [ "program", "postfix" ] | |
remove_tag => [ "grokked" ] | |
} | |
} | |
} | |
# Extract remote address from sshd log lines | |
if [program] == "sshd" { | |
grok { | |
match => { "message" => [ | |
"Invalid user %{USERNAME:username} from %{IP:remote_addr}", | |
"Failed %{WORD:login_method} for invalid user %{USERNAME:username} from %{IP:remote_addr} port %{POSINT:port} ssh2", | |
"pam_unix(sshd:auth): authentication failure; logname= uid=%{POSINT:uid} euid=%{POSINT:euid} tty=ssh ruser= rhost=%{IPORHOST:remote_addr}(?: user=%{USERNAME:username})?", | |
"PAM %{POSINT} more authentication failures; logname= uid=%{POSINT:uid} euid=%{POSINT:euid} tty=ssh ruser= rhost=%{IPORHOST:remote_addr}(?: user=%{USERNAME:username})?", | |
"Did not receive identification string from %{IPORHOST:remote_addr}" | |
] | |
} | |
} | |
if "_grokparsefailure" not in [tags] { | |
if [remote_addr] =~ /%{HOST}/ { | |
dns { | |
resolve => [ "remote_addr" ] | |
action => "replace" | |
} | |
} | |
} | |
} | |
# Extract remote address from fail2ban log lines | |
if [program] == "fail2ban.actions" { | |
grok { | |
match => { "message" => "WARNING \[%{WORD:service}\] %{WORD:action} %{IP:remote_addr}" } | |
} | |
if "_grokparsefailure" not in [tags] { | |
mutate { | |
lowercase => [ "action" ] | |
} | |
} | |
} | |
# Get rid of the tags field, as it should be empty | |
mutate { | |
remove_field => [ "tags" ] | |
} | |
} | |
#### CLF | |
if "clf" in [type] { | |
# Parse Apache Common Log Format messages | |
grok { | |
match => { "message" => [ | |
"^(?:%{HOST:vhost}:%{POSINT:port})?%{SPACE}%{COMBINEDAPACHELOG}", | |
"^(?:%{HOST:vhost}|-)?%{SPACE}%{COMBINEDAPACHELOG}", | |
"^%{COMBINEDAPACHELOG}" | |
] | |
} | |
} | |
# Convert clf to http type if grok was successful | |
if "_grokparsefailure" not in [tags] { | |
mutate { | |
replace => [ "type", "http" ] | |
} | |
} | |
} | |
if "http" in [type] { | |
# Set host on events that haven't got vhost already set | |
if [vhost] { | |
mutate { | |
remove_field => [ "host" ] | |
} | |
mutate { | |
add_field => [ "host", "%{vhost}" ] | |
} | |
mutate { | |
remove_field => [ "vhost" ] | |
} | |
} | |
# Set port on events that haven't got port already set | |
if ! [port] { | |
mutate { | |
add_field => [ "port", "80" ] | |
} | |
} | |
# Set bytes on events that haven't got bytes already set | |
if ! [bytes] { | |
mutate { | |
add_field => [ "bytes", "0" ] | |
} | |
} | |
# Rename some variables | |
mutate { | |
rename => [ "httpversion", "protocol" ] | |
rename => [ "clientip", "remote_addr" ] | |
rename => [ "auth", "remote_user" ] | |
rename => [ "agent", "user_agent" ] | |
rename => [ "verb", "method" ] | |
rename => [ "referrer", "referer" ] | |
rename => [ "response", "status" ] | |
# Utterly unreliable and useless information | |
remove_field => [ "ident" ] | |
} | |
# Remove quotes from http referer | |
mutate { | |
gsub => [ "referer", "^\"", "", | |
"referer", "\"$", "" ] | |
} | |
# Remove quotes from http user agent | |
mutate { | |
gsub => [ "user_agent", "^\"", "", | |
"user_agent", "\"$", "" ] | |
} | |
# Parse timestamp | |
date { | |
match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ] | |
} | |
# Remove parsed timestamp | |
mutate { | |
remove_field => [ "timestamp" ] | |
} | |
# Change apache tag into correct server_program | |
if "apache" in [tags] { | |
mutate { | |
add_field => [ "program", "apache" ] | |
remove_tag => [ "apache" ] | |
} | |
} | |
# Change nginx tag into correct server_program | |
if "nginx" in [tags] { | |
mutate { | |
add_field => [ "program", "nginx" ] | |
remove_tag => [ "nginx" ] | |
} | |
} | |
# Convert some fields to numbers | |
mutate { | |
convert => [ "port", "integer" ] | |
convert => [ "status", "integer" ] | |
convert => [ "bytes", "integer" ] | |
} | |
# Add scheme so that URLs can be constructed | |
if [port] == 443 { | |
mutate { | |
add_field => [ "scheme", "https" ] | |
} | |
} | |
else { | |
mutate { | |
add_field => [ "scheme", "http" ] | |
} | |
} | |
# Tag HTTP status codes as severity labels | |
if [status] < 400 { | |
mutate { | |
add_field => [ "severity", "informational" ] | |
} | |
} | |
else if [status] >= 500 { | |
mutate { | |
add_field => [ "severity", "error" ] | |
} | |
} | |
else { | |
mutate { | |
add_field => [ "severity", "warning" ] | |
} | |
} | |
# Make sure method always has a value | |
if ! [method] { | |
mutate { | |
add_field => [ "method", "-" ] | |
} | |
} | |
# Use rawrequest if request field is not available | |
if ! [request] { | |
mutate { | |
rename => [ "rawrequest", "request" ] | |
} | |
} | |
# Rewrite message | |
mutate { | |
replace => [ "message", "%{status} %{remote_addr} %{method} %{scheme}://%{host}%{request} (%{bytes} bytes)" ] | |
} | |
# Get rid of the tags field, as it should be empty | |
mutate { | |
remove_field => [ "tags" ] | |
} | |
} | |
#### JSON-LOGGER | |
if "json-logger" in [type] { | |
if [fd] == "stdout" { | |
mutate { | |
add_field => [ "severity", "informational" ] | |
remove_field => [ "fd" ] | |
} | |
} | |
if [fd] == "stderr" { | |
mutate { | |
add_field => [ "severity", "error" ] | |
remove_field => [ "fd" ] | |
} | |
} | |
} | |
#### PORTFOLIO | |
if "portfolio" in [type] { | |
mutate { | |
add_field => [ "program", "portfolio" ] | |
} | |
# Rename summary field to message, or create a dummy one | |
if [summary] { | |
mutate { | |
rename => [ "summary", "message" ] | |
} | |
} | |
else { | |
mutate { | |
add_field => [ "message", "(empty message)" ] | |
} | |
} | |
# Parse and replace timestamp | |
if [request][time][now] > 0 { | |
# Create a new field because date filter doesn't support nesting | |
mutate { | |
add_field => [ "timestamp", "%{[request][time][now]}" ] | |
} | |
# The log message should contain timestamp in epoch format | |
date { | |
match => [ "timestamp", "UNIX" ] | |
} | |
# Remove generated timestamp | |
mutate { | |
remove_field => [ "timestamp" ] | |
} | |
} | |
# Unnest JSON fields | |
mutate { | |
add_field => [ "process_mem_share_delta", "%{[process][mem][share_delta]}" ] | |
add_field => [ "process_mem_share", "%{[process][mem][share]}" ] | |
add_field => [ "process_mem_size_delta", "%{[process][mem][size_delta]}" ] | |
add_field => [ "process_mem_size", "%{[process][mem][size]}" ] | |
add_field => [ "pid", "%{[process][pid]}" ] | |
add_field => [ "request_cmd", "%{[request][cmd]}" ] | |
add_field => [ "duration", "%{[request][time][elapsed]}" ] | |
add_field => [ "session_context", "%{[session][context]}" ] | |
add_field => [ "session_id", "%{[session][id]}" ] | |
add_field => [ "remote_addr", "%{[session][remote_addr]}" ] | |
add_field => [ "user_agent", "%{[session][user_agent]}" ] | |
add_field => [ "session_user_id", "%{[session][user]id]}" ] | |
add_field => [ "remote_user", "%{[session][user][username]}" ] | |
} | |
# Remove unnested fields | |
mutate { | |
remove_field => [ "process", "request", "session" ] | |
} | |
# Remove unused fields from CLI requests | |
if [session_context] == "CLI" { | |
mutate { | |
remove_field => [ "remote_addr", "user_agent" ] | |
} | |
} | |
# Convert some fields to numbers | |
mutate { | |
convert => [ "pid", "integer" ] | |
convert => [ "process_mem_share_delta", "integer" ] | |
convert => [ "process_mem_share", "integer" ] | |
convert => [ "process_mem_size_delta", "integer" ] | |
convert => [ "process_mem_size", "integer" ] | |
convert => [ "duration", "float" ] | |
} | |
} | |
#### NGINX ERRORS (TODO) | |
if "file" in [type] and "nginx" in [tags] { | |
mutate { | |
add_field => [ "program", "nginx" ] | |
} | |
# From example here: https://logstash.jira.com/browse/LOGSTASH-1663 (not working) | |
# grok { | |
# match => { "message" => "(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) \[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:errormessage}(?:, client: (?<client>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:server})(?:, request: %{QS:request})?(?:, host: %{QS:host})?(?:, referrer: \"%{URI:referrer})" } | |
# } | |
} | |
# Add visitor geolocation information if remote_addr is available | |
if [remote_addr] { | |
geoip { | |
source => "remote_addr" | |
} | |
} | |
# Parse user_agent, if present | |
if [user_agent] { | |
useragent { | |
source => "user_agent" | |
prefix => "user_agent_" | |
} | |
} | |
} | |
output { | |
elasticsearch { | |
node_name => "logstash" | |
bind_host => "127.0.0.1" | |
host => "127.0.0.1" | |
cluster => "fbf" | |
} | |
# Debug output | |
tcp { | |
mode => "server" | |
host => "127.0.0.1" | |
port => 9999 | |
codec => "json_lines" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment