Skip to content

Instantly share code, notes, and snippets.

@jordansissel
Created October 23, 2012 22:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jordansissel/3942161 to your computer and use it in GitHub Desktop.
Save jordansissel/3942161 to your computer and use it in GitHub Desktop.
% ruby grep.rb logstash-1.1.[0123]-monolithic |sort | uniq -c
1 /files/logstash//logstash-1.1.1-monolithic.jar
1 /files/logstash//logstash-1.1.2-monolithic.jar
399 /files/logstash/logstash-1.1.0-monolithic.jar
4 /files/logstash/logstash-1.1.1-monolithic-jruby1.7.0RC1.jar
308 /files/logstash/logstash-1.1.1-monolithic.jar
537 /files/logstash/logstash-1.1.2-monolithic.jar
77 /files/logstash/logstash-1.1.3-monolithic.jar
#!/usr/bin/env ruby
require "ftw"
require "json"
def time(name=nil, &block)
name = caller[1] if name.nil?
start = Time.now
result = block.call
duration = Time.now - start
puts "#{duration} secs - #{name}" if ENV.include?("VERBOSE")
return result
end
$agent = FTW::Agent.new
def stream(query, &block)
grep = query
q = {
"query" => {
"filtered" => {
"query" => { "match_all" => {} },
"filter" => {
"bool" => {
"must" => [
{ "term" => { "@type" => "apache" } },
{ "exists" => { "field" => "request" } },
{ "range" => {
"@timestamp" => {
"from" => "2012-10-23T00:00:00Z",
"to" => "2012-10-24T00:00:00Z"
} # @timestamp
} }, # range
{ "script" => {
"script" => "_source[\"@fields\"].request ~= \".*#{grep}.*\""
} } # script
], # must
}, # bool
}, # filter
} # filtered
} # query
} # q
#puts JSON.pretty_generate(q)
size = 250
url = "http://demo.logstash.net:9200/logstash-2012.10.23/_search?size=#{size}&search_type=scan&scroll=5m"
result = time("first query: #{url}") do
request = $agent.get(url)
request.body = q.to_json
response = $agent.execute(request)
body = []
response.read_body { |chunk| body << chunk }
body.join("")
end
result = JSON.parse(result)
if result["_scroll_id"].nil?
es_duration = result["took"] / 1000.0
#puts "#{es_duration} secs - 'took' time on ES"
return if result["hits"]["hits"].empty?
time("hit evaluation (#{result["hits"]["hits"].size} events)") do
result["hits"]["hits"].collect { |h| h["_source"] }.each(&block)
end
else
scroll_id = result["_scroll_id"]
while true
url = "http://demo.logstash.net:9200/_search/scroll?scroll=5m&size=#{size}"
response = time("next scroll query: #{url}") do
request = $agent.get(url)
request.body = scroll_id
$agent.execute(request)
end
result = time("next scroll read body") do
body = []
response.read_body { |chunk| body << chunk }
body.join("")
end
result = time("scroll json parse (#{result.size} bytes)") do
JSON.parse(result)
end
es_duration = result["took"] / 1000.0
#puts "#{es_duration} secs - 'took' time on ES"
break if result["hits"]["hits"].empty?
time("hit evaluation (#{result["hits"]["hits"].size} events)") do
result["hits"]["hits"].collect { |h| h["_source"] }.each(&block)
end
scroll_id = result["_scroll_id"]
end
end
end
count = 0
start = Time.now
#stream("@type:megalog AND @timestamp:[2012-10-22T00:00:00Z TO 2012-10-23T00:00:00Z]") do |event|
stream(ARGV[0]) do |event|
#puts event["@fields"]["clientip"]
#puts event["@message"]
puts event["@fields"]["request"]
#count += 1
#if count % 10000 == 0
#puts :rate => count / (Time.now - start)
#end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment