I want to push old logs into Kibana. The logs are classical, one line per event, splitted and gzipped by logrotate. Logstash stdin import module is ugly, and I don't need to install logstash (and java) on the source computer. Netcat and socket import module is what I need.
input {
tcp {
host => "my.private.ip"
port => 4243
}
}
filter {
grok {
match => {
"message" => '\[%{DATA:timestamp}\] \[%{WORD:level}\] \[client %{IP:clientip}\] %{WORD:mod}: stderr: phptop %{URI:uri} time:%{NUMBER:time:float} user:%{NUMBER:user:float} sys:%{NUMBER:sys:float} mem:%{NUMBER:mem:float}(, referer: %{GREEDYDATA:referer})?'
}
named_captures_only => false
add_tag => ["phptop"]
}
date {
match => [
# Fri Oct 10 06:26:53 2014
"timestamp", "EEE MMM dd HH:mm:ss yyyy"
]
remove_field => "timestamp"
}
geoip {
source => "clientip"
database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
}
}
output {
elasticsearch_http {
host => "es.ip:9200"
template => "/opt/logstash/lib/logstash/outputs/elasticsearch/elasticsearch-template.json"
index => "logstash-MYCUSTOMER-%{+YYYY.MM.dd}"
flush_size => 1500
idle_flush_time => 1800
replication => async
workers => 4
}
}
On the source computer, I just use :
zcat */error.log | grep phptop | nc ip.of.logstash 4243
But the result is VERY slow. The settings (flush_size and idle_flush_time) are huge, but the result is tiny. I spy the Elasticsearch communication with pcap, Packetbeat does this job for me. The discussion on port 9200 is transformed to JSON and RPUSHed to a Redis server, I just have to BLPOP and count things. The batch size is low, and varies. Sometimes better than 100, and most of time small. I dig into logstash sources, stud sources, and the http client sources. --debug was not enough to find mistakes, and I don't know how to put ugly print in a jruby code, without recompiling it. So, I tried an ugly hack.
output {
redis {
batch => true
data_type => "list"
db => 3
host => "my.redis.server"
key => "es"
}
}
And put a Custom and KISS worker to read from Redis and push to Elasticsearch, with LARGE bulk size
#!/usr/bin/env python
import json
import logging
from redis import StrictRedis
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
def events_iterator(pool):
for event in pool:
ts = event['@timestamp'].split('T')[0].replace('-', '.')
yield {'_index': 'logstash-machin-%s' % ts,
'_type': 'logs',
'_source': event
}
if __name__ == "__main__":
logger = logging.getLogger('elasticsearch')
logger.addHandler(logging.StreamHandler())
#logger.level = logging.DEBUG
host = 'redis.host'
r = StrictRedis(host=host, db=3)
e = Elasticsearch(['es1.host', 'es2.host'])
pool = []
while True:
kv = r.blpop(['es'], timeout=30)
if kv is None:
# timeout
print "timeout"
bulk(e, events_iterator(pool))
pool = []
else:
k, v = kv
v = json.loads(v)
pool.append(v)
l = len(pool)
if l % 50 == 0:
print l
if len(pool) >= 1000:
print bulk(e, events_iterator(pool))
pool = []
Same tactic as logstash, quickly prototyped with python, with really 1000 events per bulk, and the redis blpop timeout to handle the remain in the pool. And, now, it's FAST. This hack is ugly, but I don't know how to find where logstash is slow (Because It's complicated Jruby, and debugging it is a PITx ), and why its bulk sizes are so tiny
So when doing this python thing...
Are you doing it live? Like your zcat | grep | nc shipping to logstash and output to redis. Do you wait until your zcat is done (or at least pretty far along) before running your python redis->elasticsearch consumer?