Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Kafka Logs + Filebeat + ES
"mappings": {
"_default_": {
"_all": {
"norms": false
},
"_meta": {
"version": "5.1.1"
},
"dynamic_templates": [
{
"strings_as_keyword": {
"mapping": {
"ignore_above": 1024,
"type": "keyword"
},
"match_mapping_type": "string"
}
}
],
"properties": {
"@timestamp": {
"type": "date"
},
"beat": {
"properties": {
"hostname": {
"ignore_above": 1024,
"type": "keyword"
},
"name": {
"ignore_above": 1024,
"type": "keyword"
},
"version": {
"ignore_above": 1024,
"type": "keyword"
},
"read_time": {
"type": "date"
}
}
},
"input_type": {
"ignore_above": 1024,
"type": "keyword"
},
"message": {
"norms": false,
"type": "text"
},
"component": {
"ignore_above": 1024,
"type": "keyword"
},
"level": {
"ignore_above": 1024,
"type": "keyword"
},
"class": {
"norms": false,
"type": "text"
},
"trace": {
"properties": {
"class": {
"norms": false,
"type": "text"
},
"message": {
"norms": false,
"type": "text"
},
"full": {
"norms": false,
"type": "text"
}
}
},
"gc_pause_us": {
"type": "long"
},
"eden": {
"properties": {
"before": {
"properties": {
"total": {
"type": "long"
},
"used": {
"type": "long"
}
}
},
"after": {
"properties": {
"total": {
"type": "long"
},
"used": {
"type": "long"
}
}
},
"delta": {
"properties": {
"total": {
"type": "long"
},
"used": {
"type": "long"
}
}
}
}
},
"heap": {
"properties": {
"before": {
"properties": {
"total": {
"type": "long"
},
"used": {
"type": "long"
}
}
},
"after": {
"properties": {
"total": {
"type": "long"
},
"used": {
"type": "long"
}
}
},
"delta": {
"properties": {
"total": {
"type": "long"
},
"used": {
"type": "long"
}
}
}
}
},
"survivors": {
"properties": {
"before": {
"properties": {
"used": {
"type": "long"
}
}
},
"after": {
"properties": {
"used": {
"type": "long"
}
}
},
"delta": {
"properties": {
"used": {
"type": "long"
}
}
}
}
},
"meta": {
"properties": {
"cloud": {
"properties": {
"availability_zone": {
"ignore_above": 1024,
"type": "keyword"
},
"instance_id": {
"ignore_above": 1024,
"type": "keyword"
},
"machine_type": {
"ignore_above": 1024,
"type": "keyword"
},
"project_id": {
"ignore_above": 1024,
"type": "keyword"
},
"provider": {
"ignore_above": 1024,
"type": "keyword"
},
"region": {
"ignore_above": 1024,
"type": "keyword"
}
}
}
}
},
"offset": {
"type": "long"
},
"source": {
"ignore_above": 1024,
"type": "keyword"
},
"tags": {
"ignore_above": 1024,
"type": "keyword"
},
"type": {
"ignore_above": 1024,
"type": "keyword"
}
}
}
},
"order": 0,
"settings": {
"index.refresh_interval": "5s"
},
"template": "kafkalogs-*"
}
###################### Filebeat Configuration Example #########################
# This file is an example configuration file highlighting only the most common
# options. The filebeat.full.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html
#=========================== Filebeat prospectors =============================
# kafka.home: /opt/kafka
filebeat.prospectors:
- input_type: log
# kafka log files prospector
paths:
- /var/log/kafka/controller.log*
- /var/log/kafka/server.log*
- /var/log/kafka/state-change.log*
- /var/log/kafka/kafka-*.log*
# match multiline events
multiline.pattern: '^\['
multiline.negate: true
multiline.match: after
# configure pipeline
fields.pipeline: kafka-logs
- input_type: log
# kafka gc log prospector
paths:
- /var/logs/kafka/kafkaServer-gc.log
# match multiline events
multiline.pattern: '^\s'
multiline.negate: false
multiline.match: after
# include only 'GC pause' stats
include_lines: ['GC pause']
# configure pipeline
fields.pipeline: kafka-gc-logs
#================================ Outputs =====================================
# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["elasticsearch:9200"]
index: 'kafkalogs-%{+yyyy.MM.dd}'
pipeline: '%{[fields.pipeline]}'
# Optional protocol and basic auth credentials.
username: "elastic"
password: "changeme"
#================================ Logging =====================================
# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
#logging.level: debug
# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]
{
"description": "Kafka Log Messages",
"processors": [
{
"grok": {
"field": "message",
"trace_match": true,
"patterns": [
"(?m)%{TIMESTAMP_ISO8601:log-timestamp}. %{LOGLEVEL:level} +%{JAVALOGMESSAGE:message} \\(%{JAVACLASS:class}\\)$[ \\n]*(?'trace.full'.*)"
]
}
},
{
"grok": {
"field": "message",
"patterns": [
"\\[%{DATA:component}][,:. ] +%{JAVALOGMESSAGE:message}"
],
"on_failure": [
{
"set": {
"field": "component",
"value": "unknown"
}
}
]
}
},
{
"grok": {
"field": "trace.full",
"ignore_missing": true,
"patterns": [
"%{JAVACLASS:trace.class}:\\s*%{JAVALOGMESSAGE:trace.message}"
],
"on_failure": [
{
"remove": {
"field": "trace"
}
}
]
}
},
{
"rename": {
"field": "@timestamp",
"target_field": "beat.read_time"
}
},
{
"date": {
"field": "log-timestamp",
"target_field": "@timestamp",
"formats": ["yyyy-MM-dd HH:mm:ss,SSS"]
}
},
{"remove": {"field": "log-timestamp" }}
],
"on_failure" : [{
"set" : {
"field" : "error.log",
"value" : "{{ _ingest.on_failure_message }}"
}
}]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.