Created March 1, 2012 21:53
Logstash ES River

I've seen alot of discussion on the user-list about logstash and the river feature of elasticsearch.

This is how I index my events from logstash. All of my events come into logstash via amqp with the routing event.raw.<more routing key>. Once Logstash processes them it outputs them back to amqp with the routing key event.processed.<more routing key>. I then have this worker that gets all "processed" messages and sets them up for elasticsearch to recieve. It them shoves them back up with routing key event.indexed.<yada yada yada>.

_index: _river
_type: logstash
_id: _meta
_version: 1
_score: 1
_source: {
type: rabbitmq
rabbitmq: {
port: 5672
user: user
pass: pass
vhost: /events
queue: elasticsearch
exchange: logstash
routing_key: events.indexed.#
exchange_type: topic
exchange_durable: false
queue_durable: false
queue_auto_delete: false
index: {
bulk_size: 100
bulk_timeout: 10ms
ordered: false
require 'amqp'
require 'json' do
connection = AMQP.connect(:host => '', :user => "user", :pass=>"pass", :vhost => "/events")
channel =
queue = channel.queue("es_worker", :auto_delete => true)
exchange = channel.topic("logstash", :durable => false)
queue.bind(exchange, :routing_key => "event.processed.#").subscribe do |headers, payload|
event = JSON.parse(payload)
index_message = {"index" => {"_index" => "events-#{"%Y.%m.%d")}", "_type" => event["@type"]}}.to_json + "\n"
index_message += payload + "\n"
exchange.publish(index_message, :routing_key => headers.routing_key.gsub("processed","indexed"))
