Skip to content

Instantly share code, notes, and snippets.

@athoune
Last active August 29, 2015 14:07
Show Gist options
  • Save athoune/7091dbf83cdb46d5a302 to your computer and use it in GitHub Desktop.
Save athoune/7091dbf83cdb46d5a302 to your computer and use it in GitHub Desktop.
Logstash bulk export

Massive import of cold data into Kibana

I want to push old logs into Kibana. The logs are classical, one line per event, splitted and gzipped by logrotate. Logstash stdin import module is ugly, and I don't need to install logstash (and java) on the source computer. Netcat and socket import module is what I need.

input {
    tcp {
        host => "my.private.ip"
        port => 4243
    }
}
filter {
    grok {
        match => {
            "message" => '\[%{DATA:timestamp}\] \[%{WORD:level}\] \[client %{IP:clientip}\] %{WORD:mod}: stderr: phptop %{URI:uri} time:%{NUMBER:time:float} user:%{NUMBER:user:float} sys:%{NUMBER:sys:float} mem:%{NUMBER:mem:float}(, referer: %{GREEDYDATA:referer})?'
        }
        named_captures_only => false
        add_tag => ["phptop"]
    }
    date {
        match => [
            # Fri Oct 10 06:26:53 2014
            "timestamp", "EEE MMM dd HH:mm:ss yyyy"
            ]
        remove_field => "timestamp"
    }
    geoip {
        source => "clientip"
            database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
    }
}
output {
    elasticsearch_http {
        host => "es.ip:9200"
            template => "/opt/logstash/lib/logstash/outputs/elasticsearch/elasticsearch-template.json"
        index => "logstash-MYCUSTOMER-%{+YYYY.MM.dd}"
        flush_size => 1500
        idle_flush_time => 1800
        replication => async
        workers => 4
    }
}

On the source computer, I just use :

zcat */error.log | grep phptop | nc ip.of.logstash 4243

But the result is VERY slow. The settings (flush_size and idle_flush_time) are huge, but the result is tiny. I spy the Elasticsearch communication with pcap, Packetbeat does this job for me. The discussion on port 9200 is transformed to JSON and RPUSHed to a Redis server, I just have to BLPOP and count things. The batch size is low, and varies. Sometimes better than 100, and most of time small. I dig into logstash sources, stud sources, and the http client sources. --debug was not enough to find mistakes, and I don't know how to put ugly print in a jruby code, without recompiling it. So, I tried an ugly hack.

output {
    redis {
        batch => true
        data_type => "list"
        db => 3
        host => "my.redis.server"
        key => "es"
    }
}

And put a Custom and KISS worker to read from Redis and push to Elasticsearch, with LARGE bulk size

#!/usr/bin/env python
import json
import logging
from redis import StrictRedis
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
def events_iterator(pool):
    for event in pool:
        ts = event['@timestamp'].split('T')[0].replace('-', '.')
        yield {'_index': 'logstash-machin-%s' % ts,
               '_type': 'logs',
               '_source': event
               }
if __name__ == "__main__":
    logger = logging.getLogger('elasticsearch')
    logger.addHandler(logging.StreamHandler())
    #logger.level = logging.DEBUG
    host = 'redis.host'
    r = StrictRedis(host=host, db=3)
    e = Elasticsearch(['es1.host', 'es2.host'])
    pool = []
    while True:
        kv = r.blpop(['es'], timeout=30)
        if kv is None:
            # timeout
            print "timeout"
            bulk(e, events_iterator(pool))
        pool = []
        else:
            k, v = kv
            v = json.loads(v)
            pool.append(v)
            l = len(pool)
            if l % 50 == 0:
                print l
            if len(pool) >= 1000:
                print bulk(e, events_iterator(pool))
                pool = []

Same tactic as logstash, quickly prototyped with python, with really 1000 events per bulk, and the redis blpop timeout to handle the remain in the pool. And, now, it's FAST. This hack is ugly, but I don't know how to find where logstash is slow (Because It's complicated Jruby, and debugging it is a PITx ), and why its bulk sizes are so tiny

@jordansissel
Copy link

So when doing this python thing...

Are you doing it live? Like your zcat | grep | nc shipping to logstash and output to redis. Do you wait until your zcat is done (or at least pretty far along) before running your python redis->elasticsearch consumer?

@athoune
Copy link
Author

athoune commented Oct 15, 2014

  • redis queue is empty
  • old index are removed
  • logstash is running and listen the socket
  • the python worker is running and wait
  • I launch th nc command
  • when the says "tiemout" a lot of time, I stop the nc

So the redis is used as a bus, the events flow throw it.

After, I found the -w 30 option for nc, for stooping when nothing happens from STDIN.

@jordansissel
Copy link

Curiously -

  • Why is flush_size set so high?
  • Why is idle time set to 15 minutes? That seems crazy.
  • Why use async replication?

Were these all the result of a tune-it-until-it-works approach?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment