HP BL460
- 48 Gb Memory
- 2 x X5675 @ 3.07GHz
- 2 x 10 gbps NIC
- 2tb NetApp NFS volume for ES data
- Logstash 1.1.6
__ES Command Line: __
java -XX:+UseParNewGC -Xss256k -jar bin/logstash-1.1.6-monolithic.jar agent -f test.conf
Elastic Search Template
{
"template": "logstash-*",
"settings" : {
"number_of_shards" : 1,
"number_of_replicas" : 0,
"index" : {
"query" : { "default_field" : "@message" },
"store" : { "compress" : { "stored" : true, "tv": true } },
"refresh_interval" : "5s",
cache : { "field" : { "type" : "soft" } }
}
},
"mappings": {
"_default_": {
"_all": { "enabled": false },
"_source": { "compress": true },
"dynamic_templates": [
{
"string_template" : {
"match" : "*",
"mapping": { "type": "string", "index": "not_analyzed" },
"match_mapping_type" : "string"
}
}
],
"properties" : {
"@fields": { "type": "object", "dynamic": true, "path": "full" },
"@message" : { "type" : "string", "index" : "analyzed" },
"@source" : { "type" : "string", "index" : "not_analyzed" },
"@source_host" : { "type" : "string", "index" : "not_analyzed" },
"@source_path" : { "type" : "string", "index" : "not_analyzed" },
"@tags": { "type": "string", "index" : "not_analyzed" },
"@timestamp" : { "type" : "date", "index" : "not_analyzed" },
"@type" : { "type" : "string", "index" : "not_analyzed" }
}
}
}
}
Elasticsearch output
Logstash Config
input {
generator {
type => foo
}
}
filter {
metrics { meter => "events" add_tag => "metric" }
}
output {
elasticsearch {
type => foo
#cluster => "alc"
host => "localhost"
index => "logstash-perftests"
}
graphite {
tags => "metric"
host => "ausgraphite01"
metrics => [ "stats.logstashtests.test1.rate_1m","%{events.rate_1m}","stats.logstashtests.test1.events.count","%{events.count}" ]
}
}
no output
Logstash Config
input {
generator {
type => foo
}
}
filter {
metrics { meter => "events" add_tag => "metric" }
}
output {
graphite {
tags => "metric"
host => "ausgraphite01t"
metrics => [ "stats.logstashtests.test2.rate_1m","%{events.rate_1m}","stats.logstashtests.test2.count","%{events.count}" ]
}
}
Elasticsearch HTTP output
Logstash Config
input {
generator {
type => foo
}
}
filter {
metrics { meter => "events" add_tag => "metric" }
}
output {
elasticsearch_http {
type => foo
host => "localhost"
index => "logstash-perftests"
}
graphite {
tags => "metric"
host => "ausgraphite01"
metrics => [ "stats.logstashtests.test3.rate_1m","%{events.rate_1m}","stats.logstashtests.test3.count","%{events.count}" ]
}
}
Elasticsearch river AMQP output
Logstash Config
input {
generator {
type => foo
}
}
filter {
metrics { meter => "events" add_tag => "metric" }
}
output {
elasticsearch_river {
es_host => "localhost"
type => "syslog"
index => "logstash-perftests"
amqp_host => "localhost"
user=>"guest"
persistent => false
}
graphite {
tags => "metric"
host => "ausgraphite01"
metrics => [ "stats.logstashtests.test4.rate_1m","%{events.rate_1m}","stats.logstashtests.test4.count","%{events.count}" ]
}
}
__ River object in ES__
{
"_index" : "_river",
"_type" : "logstash-auslogstash01",
"_id" : "_meta",
"_version" : 9,
"exists" : true, "_source" :
{
"type":"rabbitmq",
"rabbitmq":{
"host":"auslogstash01",
"port":5672,
"user":"guest",
"pass":"guest",
"vhost":"/",
"queue":"elasticsearch",
"exchange":"elasticsearch",
"routing_key":"elasticsearch",
"exchange_type":"direct",
"exchange_durable":"true",
"queue_durable":"true"
}
,"index":
{
"bulk_size":1000,
"bulk_timeout":"100ms"
}
}
}
due to rabbit being the bottleneck The size of the rabbit queue is what is meaningful here.
no output - filters!
Logstash Config
input {
generator {
type => foo
message => "Jan 6 03:30:06 auslogstash01 kernel: imklog 4.6.2, log source = /proc/kmsg started."
}
}
filter {
metrics { meter => "events" add_tag => "metric" }
# strip the syslog PRI part and create facility and severity fields.
# the original syslog message is saved in field %{syslog_raw_message}.
# the extracted PRI is available in the %{syslog_pri} field.
#
# You get %{syslog_facility_code} and %{syslog_severity_code} fields.
# You also get %{syslog_facility} and %{syslog_severity} fields if the
# use_labels option is set True (the default) on syslog_pri filter.
grep {
type => "syslog"
match => ["@message","<\d+>"]
add_tag => "has_pri"
drop => false
}
grok {
type => "syslog"
tags => [ "has_pri" ]
pattern => [ "<%{POSINT:syslog_pri}>%{SPACE}%{GREEDYDATA:message_remainder}" ]
add_tag => "got_syslog_pri"
add_field => [ "syslog_raw_message", "%{@message}" ]
}
syslog_pri {
type => "syslog"
tags => [ "got_syslog_pri" ]
}
mutate {
type => "syslog"
tags => [ "got_syslog_pri" ]
replace => [ "@message", "%{message_remainder}" ]
}
mutate {
# XXX must not be combined with replacement which uses same field
type => "syslog"
tags => [ "got_syslog_pri" ]
remove => [ "message_remainder" ]
}
# strip the syslog timestamp and force event timestamp to be the same.
# the original string is saved in field %{syslog_timestamp}.
# the original logstash input timestamp is saved in field %{received_at}.
grok {
type => "syslog"
pattern => [ "%{SYSLOGTIMESTAMP:syslog_timestamp}%{SPACE}%{GREEDYDATA:message_remainder}" ]
add_tag => "got_syslog_timestamp"
add_field => [ "received_at", "%{@timestamp}" ]
}
mutate {
type => "syslog"
tags => [ "got_syslog_timestamp" ]
replace => [ "@message", "%{message_remainder}" ]
}
mutate {
# XXX must not be combined with replacement which uses same field
type => "syslog"
tags => [ "got_syslog_timestamp" ]
remove => [ "message_remainder" ]
}
date {
type => "syslog"
tags => [ "got_syslog_timestamp" ]
# season to taste for your own syslog format(s)
syslog_timestamp => [ "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
}
# strip the host field from the syslog line.
# the extracted host field becomes the logstash %{@source_host} metadata
# and is also available in the filed %{syslog_hostname}.
# the original logstash source_host is saved in field %{logstash_source}.
grok {
type => "syslog"
pattern => [ "%{SYSLOGHOST:syslog_hostname}%{SPACE}%{GREEDYDATA:message_remainder}" ]
add_tag => "got_syslog_host"
add_field => [ "logstash_source", "%{@source_host}" ]
}
mutate {
type => "syslog"
tags => [ "got_syslog_host" ]
replace => [ "@source_host", "%{syslog_hostname}" ]
replace => [ "@message", "%{message_remainder}" ]
}
mutate {
# XXX must not be combined with replacement which uses same field
type => "syslog"
tags => [ "got_syslog_host" ]
remove => [ "message_remainder" ]
}
# strip the program and optional pid field from the syslog line.
# available in the field %{syslog_program} and %{syslog_pid}.
grok {
type => "syslog"
pattern => [ "%{PROG:syslog_program}(?:\[%{POSINT:syslog_pid}\])?:%{SPACE}%{GREEDYDATA:message_remainder}" ]
add_tag => "got_syslog_program"
}
mutate {
type => "syslog"
tags => [ "got_syslog_program" ]
replace => [ "@message", "%{message_remainder}" ]
}
mutate {
# XXX must not be combined with replacement which uses same field
type => "syslog"
tags => [ "got_syslog_program" ]
remove => [ "message_remainder" ]
}
}
output {
graphite {
tags => "metric"
host => "ausgraphite01t"
metrics => [ "stats.logstashtests.test2.rate_1m","%{events.rate_1m}","stats.logstashtests.test2.count","%{events.count}" ]
}
}
note sudden jump in performance ... that's from adding more filter wokers ( -w 8 )
259% penalty for running syslog through filters ( 24000 | 9250 ) ....
until you add more workers ... then it becomes pretty even ... ( 2400 | 22500 )