Skip to content

Instantly share code, notes, and snippets.

@NiceGuyIT
Last active March 4, 2024 18:10
Show Gist options
  • Star 31 You must be signed in to star a gist
  • Fork 13 You must be signed in to fork a gist
  • Save NiceGuyIT/58dd4d553fe3017cbfc3f98c2fbdbc93 to your computer and use it in GitHub Desktop.
Save NiceGuyIT/58dd4d553fe3017cbfc3f98c2fbdbc93 to your computer and use it in GitHub Desktop.
nginx JSON to Filebeat to Logstash to Elasticsearch

Intro

This is an example configuration to have nginx output JSON logs to make it easier for Logstash processing. I was trying to get nginx > Filebeat > Logstash > ES working and it wasn't until I connected Filebeat directly to Elasticsearch that I saw the expected data. Google led me to ingest-convert.sh and I realized filebeat setup works for Filebeat > ES but not Filebeat > Logstash > ES. This is because Logstash does not use ingest pipelines by default. You have to enable them in the elasticsearch output block.

Having nginx log JSON in the format required for Elasticsearch means there's very little processing (i.e. grok) to be done in Logstash. nginx can only output JSON for access logs; the error_log format cannot be changed.

Extra fields are output and not used by the Kibana dashboards. I included them in case they might be useful. Since they are not declared in the filebeat setup, their default is "string" when you refresh the field list. This might limit their usefulness for aggregations.

Why Logstash?

  • Don't use Logstash? Point your beats to ES and be done with it.
  • On the fence about Logstash? Don't. Point your beats to ES and be done with it.
  • Do you have to use Logstash? Then this might be useful.
  • Do you have a specific need to use Logstash? Then this might be useful.

Logstash pipelines vs Elasticsearch ingest nodes

This article has some good information about why you would choose Logstash over ingest nodes. While there are some benefits to using Logstash, it adds complexity because you have to maintain the pipeline processors outside ES OR use conditionals in the output block to specify the ingest node to use for each type of document. I'm still learning the internals but here are some things I found when trying to use multiple ingest nodes in ES.

  1. Only 1 pipeline can be specified inside an elasticsearch block. Multiple pipelines are supported by files, which means you'll need to pull them from the *beat install or extract them from ES.
  2. Conditionals need to be outside the elasticsearch block, not inside. This means multiple elasticsearch outputs, one for each beat.
  3. Variables can be used for the pipeline name but I don't know of a way to check if the pipeline exists before expanding the variables. If the pipeline doesn't exist, tons of warnings are logged.

Pros

  1. Simplifies the processing for nginx access logs.
  2. Provides millisecond resolution.
  3. Provides other fields if you're interested in them.

Cons

  1. Logstash does not process nginx error logs by default. You need to set pipeline in the output block.
  2. Maintaining pipelines in Logstash instead of using the already managed ingest processors provided by filebeat setup. Although once you understand that you need to convert the ingest pipeline to Logstash .conf format, this could be a benefit.

Issues

  1. ES uses remote_ip while nginx uses $http_x_real_ip for servers behind proxies. I believe this is best fixed with proxy_set_header in the nginx config but haven't had time to hash it out.

Requirements

  1. A working ELK stack.
    1. *.example.com is used to demonstrate different servers.
    2. Tested with 6.1.2 on openSUSE. YMMV
  2. nginx 1.11.8 or higher.
  3. Half a brain. You're expected to know administration. Well, maybe three quarters of a brain.

Filebeat setup

This will load the templates and fields into Elasticsearch and the dashboards into Kibana. The ingest pipelines are loaded with filebeat --setup which proceeds to run filebeat after the setup. filebeat setup does not load the pipelines. I assume this is because the pipelines are relevent only when filebeat is connected directly to Elasticsearch.

filebeat setup -e \
        -E 'setup.template.overwrite=true' \
        -E 'setup.kibana.host="kibana.example.com:5601"' \
        -E 'output.logstash.enabled=false' \
        -E 'output.elasticsearch.hosts=["elasticsearch.example.com:9200"]'

nginx

Everything is wrapped in a single "nginx" element/object to match ES's "nginx" mapping. Put log-json.conf in the nginx directory.

include log-json.conf
access_log /var/log/nginx/access.json json;

filebeat.yml

keys_under_root=true will output fields nginx.* which is what we want, while keys_under_root=false will output json.nginx.*.

filebeat.prospectors:
- type: log
  paths:
    - /var/log/nginx/*.json
  tags: ["nginx", "json"]
  json:
    keys_under_root: true
    add_error_key: true

Logstash

You have input and output defined in another file in /etc/logstash/conf.d/. Right? Add filter-nginx.conf to the mix. This processes error_log files as well.

# Process JSON documents output by nginx.
filter {
if "nginx" in [tags] {
# nginx doesn't log the http version, only the protocol.
# i.e. HTTP/1.1, HTTP/2
grok {
match => {
"[nginx][access][http_protocol]" => "HTTP/%{NUMBER:[nginx][access][http_version]}"
}
}
# @timestamp is when filebeat reads the event.
mutate {
add_field => {
"[nginx][read_timestamp]" => "%{@timestamp}"
}
}
# msec has millisecond resolution.
date {
match => [
"[nginx][time][msec]",
"UNIX"
]
target => "@timestamp"
}
# Requires geoip plugin
geoip {
source => "[nginx][access][remote_ip]"
target => "[nginx][access][geoip]"
}
# Requires user-agent plugin
useragent {
source => "[nginx][access][agent]"
target => "[nginx][access][user_agent]"
}
} else if [source] =~ '\/nginx\/error' {
# nginx error_log
grok {
match => {
"message" => "%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"
}
}
# @timestamp is when filebeat reads the event.
mutate {
add_field => {
"[nginx][read_timestamp]" => "%{@timestamp}"
}
}
# For error logs
date {
match => [
"[nginx][error][time]",
"YYYY/MM/dd H:m:s"
]
target => "@timestamp"
}
}
}
# Match the format for Elasticsearch
log_format json escape=json
'{ '
'"fileset": { '
'"module": "nginx", '
'"name": "access" '
'}, '
'"nginx": { '
'"access": { '
'"remote_ip": "$remote_addr", '
'"user_name": "$remote_user", '
'"time": "$time_local", '
'"method": "$request_method", '
'"host": "$host", '
'"url": "$request_uri", '
'"http_protocol": "$server_protocol", '
'"response_code": "$status", '
'"body_sent": { '
'"bytes": "$body_bytes_sent" '
'}, '
'"referrer": "$http_referer", '
'"agent": "$http_user_agent" '
'}, '
'"request": "$request", '
'"connection": "$connection", '
'"pipe": "$pipe", '
'"connection_requests": "$connection_requests", '
'"time": { '
'"iso8601": "$time_iso8601", '
'"msec": "$msec", '
'"request": "$request_time" '
'}, '
'"bytes": { '
'"request_length": "$request_length", '
'"body_sent": "$body_bytes_sent", '
'"sent": "$bytes_sent" '
'}, '
'"http": { '
'"x_forwarded_for": "$http_x_forwarded_for", '
'"x_forwarded_proto": "$http_x_forwarded_proto", '
'"x_real_ip": "$http_x_real_ip", '
'"x_scheme": "$http_x_scheme" '
'}, '
'"upstream": { '
'"addr": "$upstream_addr", '
'"status": "$upstream_status", '
'"response_time": "$upstream_response_time", '
'"connect_time": "$upstream_connect_time", '
'"header_time": "$upstream_header_time" '
'} '
'} '
'}';
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment