Skip to content

Instantly share code, notes, and snippets.

Created November 23, 2011 07:11
Show Gist options
  • Star 19 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save lusis/1388077 to your computer and use it in GitHub Desktop.
Save lusis/1388077 to your computer and use it in GitHub Desktop.
Ruby logstash cli application - allows searching historical data in ElasticSearch or live tailing from AMQP topic exchange
<!-- the env variables are controlled by Chef and passed in via -D on the java command-line -->
<!-- This is using the appender here: -->
<appender name="graylog2" class="org.graylog2.log.GelfAppender">
<param name="graylogHost" value="${graylog.server}"/>
<param name="originHost" value="${graylog.origin}"/>
<param name="extractStacktrace" value="true"/>
<param name="addExtendedInformation" value="true"/>
<!-- The _web part is because a given app has multiple components -->
<!-- This app might have a _web as well as an _batch component -->
<param name="facility" value="${graylog.facility}_web"/>
<param name="additionalFields" value="{'environment': '${graylog.env}'}"/>
<param name="Threshold" value="INFO"/>
#!/usr/bin/env ruby
require 'rubygems'
require 'bundler/setup'
require 'tire'
require 'slop'
require 'time'
rescue LoadError
puts "You seem to be missing some key libraries"
puts "please run `gem install bundler --no-ri --no-rdoc; bundle install`"
require 'yajl/json_gem'
rescue LoadError
puts "`gem install yajl-json` for better performance"
def bold(string)
def underline(string)
def red(string)
whoami = __FILE__
opts = Slop.parse do
banner "Usage: #{whoami} -i <index> -f <facility>"
on :i, :index=, "index to search (default: logstash-#{'%Y.%m.%d')})", :default => "logstash-#{'%Y.%m.%d')}"
on :f, :facility=, "REQUIRED: Facility(application name) to use", true
on :s, :size=, "number of results to return (default: 500)", true, :default => 500
on :c, :class_name=, "optional class name to narrow results by", true
on :g, :grep=, "optional search on message content. Warning!!! This is slow", true
on :exceptions, "toggle exception search. Warning!!! This is slow", :default => false
on :live, "runs in live logging mode. This is A LOT of output.", :default => false
on :fields=, Array, "optional comma-separated list of fields to display (tstamp, msg, file, class_name, service, host) in order (default: tstamp,service,msg)"
on :h, :help, 'Print this help message', :tail => true do
puts help
puts <<-EOF
- last 10 results log entries from curation including timestamp, classname and message:
#{whoami} -f curation* -s 10 --fields tstamp,class_name,msg
- last 5 entries for class name com.mycompany (note the quote around * in the -f option):
#{whoami} -f "*" -s 5 -c com.mycompany.*
- last 20 entries everywhere with timestamp, service name and message
#{whoami} -f "*" -s 20 --fields tstamp,service,msg
- last 5 exceptions everywhere
#{whoami} -f "*" -s 5 --exceptions
- perform a live tail of all foo_server logs (note the wildcard as it's neccesary) showing hostname
#{whoami} -f foo_server.* --live --fields host,tstamp,msg
- perform a live tail of tracking logs for tracker1 showing timestamp, classname, message
#{whoami} -f tracker_web.tracker1 --live --fields tstamp,class_name,msg
if opts[:facility].nil?
puts red("\nFacility (matches name of service) CANNOT be empty!\n")
puts opts
if opts[:live]
require 'amqp'
require 'json'
AMQP.start("amqp://user:pass@rabbitmq-server/logstash") do |connection, open_ok|
channel =, :auto_recovery => true)
exchange_name = "logfiles"
channel.queue("", :auto_delete => true, :durable => false, :exclusive => true) do |queue, declare_ok|
queue.bind(exchange_name, :routing_key => "#{opts[:facility]}")
queue.subscribe do |payload|
parsed_message = JSON.parse(payload)
service = parsed_message["@fields"]["facility"]
class_name = underline(parsed_message["@fields"]["_logger"])
file = parsed_message["@fields"]["file"]
msg = parsed_message["@fields"]["full_message"]
host = parsed_message["@fields"]["host"]
tstamp = Time.iso8601(parsed_message["@timestamp"]).localtime
fields = opts[:fields] || ["tstamp", "service", "msg"]
vals = {|x| x == fields[0] ? "\e[1m[#{eval(x)}]\e[0m" : eval(x)}
display = vals.join(" - ")
puts display
trap("INT") { puts "Shutting down..."; connection.close { EM.stop } }
Tire.configure {url "http://es-server:9200"}
search =[:index]) do
query do
boolean do
must { string "facility:#{opts[:facility]}" }
must { string "_logger:#{opts[:class_name]}" } unless opts[:class_name].nil?
must { string "full_message:Exception*" } if opts[:exceptions]
must { string "full_message:#{opts[:grep]}*" } if opts[:grep]
sort do
by :@timestamp, 'desc'
size opts[:size]
rescue Exception => e
puts "\nSomething went wrong with the search. This is usually do to lucene query parsing of the 'grep' option"
search.results.sort {|a,b| a[:@timestamp] <=> b[:@timestamp] }.each do |res|
service = res[:@fields][:facility]
class_name = underline(res[:@fields][:_logger])
file = res[:@fields][:file]
msg = res[:@fields][:full_message]
tstamp = Time.iso8601(res[:@timestamp]).localtime
host = res[:@fields][:host]
fields = opts[:fields] || ["tstamp", "service", "msg"]
vals = {|x| x == fields[0] ? "\e[1m[#{eval(x)}]\e[0m" : eval(x)}
display = vals.join(" - ")
puts display
input {
gelf { type => 'gelf_input'}
output {
elasticsearch { cluster => 'logstash' host => 'es-server' port => 9300}
amqp {
# debug => true
durable => false
exchange_type => 'topic'
host => 'rabbit-server'
name => 'logfiles'
user => 'logstash'
password => 'logstash'
persistent => false
vhost => 'logstash'
# using the routing key is REALLY flexible for narrowing results.
# Add severity, filename, classname, whatever
key => "%{facility}.%{host}"
graylog2_server = search(:node, "roles:graylog2_server").first.ipaddress
graylog_opts = "-Dgraylog.server=#{graylog2_server} -Dgraylog.origin=#{} -Dgraylog.env=#{node.chef_environment} -Dgraylog.facility=tracker"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment