Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

Piping data from Mongostat to Elasticsearch with Logstash

Requirements

  • Logstash 1.5.x
    • If you want to run Logstash 2.x, see the section titled Logstash 2.x below
  • Mongostat 2.8+ (for support of the --json flag)

Preparation

  1. Copy all of the files from this gist into your Logstash directory. Put logstash.conf and es-template-mongostat.json in $LOGSTASH_DIR/conf/.
  2. Edit the connection information in conf/logstash.conf for your Elasticsearch output
  3. If you change the index name, you'll also need to update the template name in es-template-mongostat.json.

Run

$ mongostat --json -h localhost --port 27017 -u bob -ppassword --authenticationDatabase admin --discover | python mongostat-filter.py | bin/logstash -f conf/logstash.conf

Logstash 2.x

All of these configs should work fine on Logstash 2.x as well, with the exception of two things:

  1. The logstash-filter-bytes2human plugin has not yet been updated for Logstash 2.x support. You'll need to manually install a patched version of this plugin if you wish to run 2.x.
  2. The syntax for the logstash-output-elasticsearch plugin has changed
  3. host is now hosts
  4. The protocol option has been removed in favor of a separate plugin, logstash-output-elasticsearch_java, for communicating over protocols other than HTTP.
{
"template" : "mongostat-*",
"settings" : {
"index.refresh_interval" : "5s"
},
"mappings" : {
"_default_" : {
"_all" : {"enabled" : true, "omit_norms" : true},
"dynamic_templates" : [ {
"message_field" : {
"match" : "message",
"match_mapping_type" : "string",
"mapping" : {
"type" : "string", "index" : "analyzed", "omit_norms" : true,
"fielddata" : { "format" : "disabled" }
}
}
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : {
"type" : "string", "index" : "analyzed", "omit_norms" : true,
"fielddata" : { "format" : "disabled" },
"fields" : {
"raw" : {"type": "string", "index" : "not_analyzed", "doc_values" : true, "ignore_above" : 256}
}
}
}
}, {
"float_fields" : {
"match" : "*",
"match_mapping_type" : "float",
"mapping" : { "type" : "float", "doc_values" : true }
}
}, {
"double_fields" : {
"match" : "*",
"match_mapping_type" : "double",
"mapping" : { "type" : "double", "doc_values" : true }
}
}, {
"byte_fields" : {
"match" : "*",
"match_mapping_type" : "byte",
"mapping" : { "type" : "byte", "doc_values" : true }
}
}, {
"short_fields" : {
"match" : "*",
"match_mapping_type" : "short",
"mapping" : { "type" : "short", "doc_values" : true }
}
}, {
"integer_fields" : {
"match" : "*",
"match_mapping_type" : "integer",
"mapping" : { "type" : "integer", "doc_values" : true }
}
}, {
"long_fields" : {
"match" : "*",
"match_mapping_type" : "long",
"mapping" : { "type" : "long", "doc_values" : true }
}
}, {
"date_fields" : {
"match" : "*",
"match_mapping_type" : "date",
"mapping" : { "type" : "date", "doc_values" : true }
}
}, {
"geo_point_fields" : {
"match" : "*",
"match_mapping_type" : "geo_point",
"mapping" : { "type" : "geo_point", "doc_values" : true }
}
} ],
"properties" : {
"@timestamp": { "type": "date", "doc_values" : true },
"@version": { "type": "string", "index": "not_analyzed", "doc_values" : true },
"geoip" : {
"type" : "object",
"dynamic": true,
"properties" : {
"ip": { "type": "ip", "doc_values" : true },
"location" : { "type" : "geo_point", "doc_values" : true },
"latitude" : { "type" : "float", "doc_values" : true },
"longitude" : { "type" : "float", "doc_values" : true }
}
}
}
}
}
}
input {
stdin {
codec => json
tags => ["mongostat"]
}
}
filter {
grok {
match => {
"qr|qw" => "%{INT:qr:int}\|%{INT:qw:int}"
}
remove_field => [ "qr|qw" ]
}
grok {
match => {
"ar|aw" => "%{INT:ar:int}\|%{INT:aw:int}"
}
remove_field => [ "ar|aw" ]
}
grok {
match => {
"command" => "%{INT:commands_local:int}\|%{INT:commands_replicated:int}"
}
remove_field => [ "command" ]
}
grok {
match => {
"host" => "%{HOSTNAME:hostname}:%{POSINT:port:int}"
}
}
mutate { remove_field => [ "time" ] }
# Append 'ib' to all the byte values for bytes2human
mutate {
gsub => [
"netOut", "(?<!b)$", "ib",
"netIn", "(?<!b)$", "ib",
"vsize", "(?<!b)$", "ib",
"res", "(?<!b)$", "ib",
"mapped", "(?<!b)$", "ib",
"non-mapped", "(?<!b)$", "ib"
]
}
# logstash-filter-bytes2human is required for these conversions
bytes2human {
convert => [
"netOut", "bytes",
"netIn", "bytes",
"vsize", "bytes",
"res", "bytes"
]
}
mutate {
convert => {
"conn" => "integer"
"insert" => "integer"
"update" => "integer"
"delete" => "integer"
"query" => "integer"
"flushes" => "integer"
"faults" => "integer"
"getmore" => "integer"
"netIn" => "integer"
"netOut" => "integer"
"mapped" => "integer"
"non-mapped" => "integer"
"vsize" => "integer"
"res" => "integer"
}
}
}
output {
# stdout { codec => rubydebug }
elasticsearch {
index => "mongostat-%{+YYYY.MM.dd}"
template => "conf/es-template-mongostat.json"
template_name => "mongostat"
user => "bob"
password => "password"
host => [
"es1.example.com",
"es2.example.com",
"es3.example.com",
"es4.example.com"
]
port => 9200
protocol => "http"
}
}
#!/usr/bin/env python
import json
import sys
while True:
try:
data = json.loads(sys.stdin.readline())
for metric in data.itervalues():
print json.dumps(metric)
except Exception as e:
sys.stderr.write("Error: %s" % e)
sys.stderr.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.