Skip to content

Instantly share code, notes, and snippets.

@crazed
Created August 8, 2014 18:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save crazed/c5d79d2099a6327adb1f to your computer and use it in GitHub Desktop.
Save crazed/c5d79d2099a6327adb1f to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
require 'open3'
require 'oj'
require 'multi_json'
require 'elasticsearch'
require 'faraday'
class FlowStoreClient
include Elasticsearch::API
def initialize(args={})
@bulk_size = args.delete(:bulk_size) || 1000
@thread_count = args.delete(:thread_count) || 5
@pmacct = args.delete(:pmacct) || '/usr/local/bin/pmacct'
@pmacct_pipe = args.delete(:pmacct_pipe) || '/tmp/nfacctd-full.pipe'
@state_file = args.delete(:state_file) || '/var/tmp/flow_store.state'
@run_interval = args.delete(:run_interval) || 60
# Read in state from previous runs
read_state!
end
# Requirement to create a proper elasticsearch client
# per the Elasticsearch::API documentation.
def perform_request(method, path, params, body)
es_client.perform_request(method, path, params, body)
end
def run!
loop do
# Make sure the elasticsearch index exists
create_index!
time = Time.now
duration = time - @lastrun
timestamp = time.strftime("%Y-%m-%d %H:%M:%S.000")
# Grab data from pmacct in json format, then flush it to elastic
Open3.popen3("#{@pmacct} -s -p #{@pmacct_pipe} -O json") do |stdin, stdout, sterr, wait_thr|
while line = stdout.gets
data = { 'stats' => MultiJson.load(line) }
data['bps'] = (data['stats']['bytes'] * 8.0 / duration.to_f).round
data['pps'] = (data['stats']['packets'] / duration.to_f).round
index_bulk(data)
end
end
# Flush the memory for our pipe
`#{@pmacct} -l -p #{@pmacct_pipe} -e`
# Record the last run, and make sure we flush our state
# so that any process can pick up where we left off.
@lastrun = Time.now
write_state!
sleep @run_interval
end
end
def index_bulk(data)
@index_bulk_buffer ||= []
@threads ||= []
# Push our data onto the stack
@index_bulk_buffer << { index: { _index: current_index, _type: 'flowdata', data: data} }
# Flush it to elastic when we hit the BULK size
if @index_bulk_buffer.size >= @bulk_size
body = @index_bulk_buffer.dup
@index_bulk_buffer = []
# Spawn a new thread to handle these requests
@threads << Thread.new { self.bulk(body: body) }
# If we have too many threads, wait for them to finish
if @threads.size >= @thread_count
warn "Waiting for #{@threads.size} threads"
@threads.each { |t| t.join; @threads.delete(t) }
end
end
end
private
def write_state!
File.open(@state_file, 'w') do |f|
f.write(MultiJson.dump({ :lastrun => @lastrun }))
end
end
def read_state!
begin
File.open(@state_file, 'r') do |f|
data = MultiJson.load(f.read)
@lastrun = Time.mktime(data['lastrun'])
end
rescue MultiJson::ParseError => e
warn "Invalid JSON found in state file '#{@state_file}'"
rescue Errno::ENOENT => e
warn "Could not open state file '#{@state_file}'"
ensure
@lastrun ||= Time.new
end
end
def es_client
@es_client ||= Elasticsearch::Client.new
end
def current_index
time = Time.now
"flow-#{time.strftime("%Y-%m-%d-%H")}"
end
def create_index!
if not self.indices.exists(index: current_index)
puts "Creating index: #{current_index}"
create_index
end
end
def create_index
self.indices.create(index: current_index,
body: {
mappings: {
flowdata: {
_timestamp: {
enabled: true,
store: true,
format: 'date_hour_minute_second_fraction',
index: 'analyzed',
},
properties: {
pps: {
type: 'long',
index: 'analyzed',
enabled: true,
store: true,
},
bps: {
type: 'long',
index: 'analyzed',
enabled: true,
store: true,
},
stats: {
type: 'object',
}
},
}
}
}
)
end
end
flow = FlowStoreClient.new
flow.run!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment