Skip to content

Instantly share code, notes, and snippets.

@nickethier
Created March 1, 2012 21:53
Show Gist options
  • Save nickethier/1953482 to your computer and use it in GitHub Desktop.
Save nickethier/1953482 to your computer and use it in GitHub Desktop.
Logstash ES River

I've seen alot of discussion on the user-list about logstash and the river feature of elasticsearch.

This is how I index my events from logstash. All of my events come into logstash via amqp with the routing event.raw.<more routing key>. Once Logstash processes them it outputs them back to amqp with the routing key event.processed.<more routing key>. I then have this worker that gets all "processed" messages and sets them up for elasticsearch to recieve. It them shoves them back up with routing key event.indexed.<yada yada yada>.

{
_index: _river
_type: logstash
_id: _meta
_version: 1
_score: 1
_source: {
type: rabbitmq
rabbitmq: {
host: rabbit.myhost.net
port: 5672
user: user
pass: pass
vhost: /events
queue: elasticsearch
exchange: logstash
routing_key: events.indexed.#
exchange_type: topic
exchange_durable: false
queue_durable: false
queue_auto_delete: false
}
index: {
bulk_size: 100
bulk_timeout: 10ms
ordered: false
}
}
}
require 'amqp'
require 'json'
EventMachine.run do
connection = AMQP.connect(:host => 'rabbit.myhost.net', :user => "user", :pass=>"pass", :vhost => "/events")
channel = AMQP::Channel.new(connection)
queue = channel.queue("es_worker", :auto_delete => true)
exchange = channel.topic("logstash", :durable => false)
queue.bind(exchange, :routing_key => "event.processed.#").subscribe do |headers, payload|
event = JSON.parse(payload)
index_message = {"index" => {"_index" => "events-#{Time.now.strftime("%Y.%m.%d")}", "_type" => event["@type"]}}.to_json + "\n"
index_message += payload + "\n"
exchange.publish(index_message, :routing_key => headers.routing_key.gsub("processed","indexed"))
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment