Skip to content

Instantly share code, notes, and snippets.

@dcode
Last active August 29, 2015 14:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dcode/60ac10d8f584a17dc77f to your computer and use it in GitHub Desktop.
Save dcode/60ac10d8f584a17dc77f to your computer and use it in GitHub Desktop.
Bro filter for LogStash
# encoding: utf-8 # *NOTE*: I have no idea what I'm doing and this is untested. Use at your own risk
# (though I welcome assistance)
require "logstash/filters/base"
require "logstash/namespace"
require "csv"
require "bigdecimal"
# The Bro filter takes an event field containing Bro log data, parses it,
# and stores it as individual fields with the names parsed from the header.
class LogStash::Filters::Bro < LogStash::Filters::Base
config_name "bro"
milestone 1
# The CSV data in the value of the `source` field will be expanded into a
# data structure.
config :source, :validate => :string, :default => "message"
# Define a list of column names (in the order they appear in the CSV,
# as if it were a header line). If `columns` is not configured, or there
# are not enough columns specified, the default column names are
# "column1", "column2", etc. In the case that there are more columns
# in the data than specified in this column list, extra columns will be auto-numbered:
# (e.g. "user_defined_1", "user_defined_2", "column3", "column4", etc.)
#config :columns, :validate => :array, :default => []
# Define the column separator value. If this is not specified, the default
# is a tab ' '.
# Optional.
config :separator, :validate => :string, :default => ' '
# Define the set separator value. If this is not specified, the default
# is a comma ','.
# Optional.
config :set_separator, :validate => :string, :default => ','
# Define target field for placing the data.
# Defaults to writing to the root of the event.
config :target, :validate => :string
public
def register
@header_done = false
@meta = { "path" => {} }
end # def register
public
def filter(event)
return unless filter?(event)
@logger.debug? and @logger.debug("Running bro filter", :event => event)
matches = 0
if !event.include?("path")
@logger.error("The bro filter requires a \"path\" field added in the input section of the logstash config!")
else
path_ = event["path"]
if !@meta.has_key?(path_)
@meta[path_] = {}
end
end
if event[@source]
if event[@source].is_a?(String)
event[@source] = [event[@source]]
end
if event[@source].length > 1
@logger.warn("bro filter only works on fields of length 1",
:source => @source, :value => event[@source],
:event => event)
return
end
raw = event[@source].first
if @meta[path_]["header_done"] == false or raw.start_with?("#separator")
if raw.start_with?("#separator")
@meta[path_]["header_done"] = false # This will reparse the header if we encounter a new one
@meta[path_]["separator"] = raw.partition(/\s/)[2]
elsif raw.start_with?("#set_separator")
sep = @meta[path_]["separator"]
@meta[path_]["set_separator"] = raw.partition(/#{sep}/)[2]
elsif raw.start_with?("#empty_field")
sep = @meta[path_]["separator"]
@meta[path_]["empty_field"] = raw.partition(/#{sep}/)[2]
elsif raw.start_with?("#unset_field")
sep = @meta[path_]["separator"]
@meta[path_]["unset_field"] = raw.partition(/#{sep}/)[2]
elsif raw.start_with?("#path")
sep = @meta[path_]["separator"]
@meta[path_]["path"] = raw.partition(/#{sep}/)[2]
elsif raw.start_with?("#fields")
sep = @meta[path_]["separator"]
@meta[path_]["columns"] = raw.gsub(/\./, "_").partition(/#{sep}/)[2].split(/#{sep}/)
elsif raw.start_with?("#types")
sep = @meta[path_]["separator"]
@meta[path_]["types"] = raw.partition(/#{sep}/)[2].split(/#{sep}/)
# Map the Bro types to ES types
@meta[path_]["types"].each_index do |i|
case @meta[path_]["types"][i]
when "count"
@meta[path_]["types"][i] = "int"
when "double"
@meta[path_]["types"][i] = "float"
when "interval"
@meta[path_]["types"][i] = "float"
when "time"
@meta[path_]["types"][i] = "time"
else
@meta[path_]["types"][i] = "string"
end
end
@meta[path_]["header_done"] = true
end
if @logger.info? and @meta[path_]["header_done"] == true
@logger.info("separator: \"#{@meta[path_]["separator"]}\"")
@logger.info("path: \"#{path_}\"")
@logger.info("columns: \"#{@meta[path_]["columns"]}\"")
@logger.info("types: \"#{@meta[path_]["types"]}\"")
end
event.cancel
return
end # End header_done == false
begin
sep = @meta[path_]["separator"]
#values = CSV.parse_line(raw, :col_sep => sep)
values = raw.split(/#{sep}/)
if @target.nil?
# Default is to write to the root of the event.
dest = event
else
dest = event[@target] ||= {}
end
cols = @meta[path_]["columns"]
types = @meta[path_]["types"]
values.each_index do |i|
field_name = cols[i] || "column#{i+1}"
field_type = types[i] || "string"
case field_type
when "int"
values[i] = values[i].to_i
when "float"
values[i] = values[i].to_f
when "time" # Create an actual timestamp
# Truncate timestamp to millisecond precision
secs = BigDecimal.new(values[i])
dest["#{field_name}_raw"] = secs
msec = secs * 1000 # convert to whole number of milliseconds
msec = msec.to_i
dest["#{field_name}_usec"] = (secs * 10**6) % 1000
values[i] = Time.at(msec / 1000, (msec % 1000) * 1000)
end
dest[field_name] = values[i]
end
# Add some additional data
dest["@timestamp"] = dest["ts"]
dest["ts_end"] = dest["ts"] + dest["duration"] if not dest["duration"].nil?
dest["path"] = @meta[path_]["path"]
filter_matched(event)
rescue => e
event.tag "_broparsefailure"
@logger.warn("Trouble parsing bro", :source => @source, :raw => raw,
:exception => e)
return
end # begin
end # if event
@logger.debug("Event after bro filter", :event => event)
end # def filter
end # class LogStash::Filters::Bro
input {
file { path => ["/var/opt/bro/logs/current/app_stats.log"] type => "bro" add_field => { "path" => "app_stats" } }
file { path => ["/var/opt/bro/logs/current/conn.log"] type => "bro" add_field => { "path" => "conn" } }
file { path => ["/var/opt/bro/logs/current/dhcp.log"] type => "bro" add_field => { "path" => "dhcp" } }
file { path => ["/var/opt/bro/logs/current/dns.log"] type => "bro" add_field => { "path" => "dns" } }
file { path => ["/var/opt/bro/logs/current/dpd.log"] type => "bro" add_field => { "path" => "dpd" } }
file { path => ["/var/opt/bro/logs/current/files.log"] type => "bro" add_field => { "path" => "files" } }
file { path => ["/var/opt/bro/logs/current/http.log"] type => "bro" add_field => { "path" => "http" } }
file { path => ["/var/opt/bro/logs/current/known_hosts.log"] type => "bro" add_field => { "path" => "known_hosts" } }
file { path => ["/var/opt/bro/logs/current/known_services.log"] type => "bro" add_field => { "path" => "known_services" } }
file { path => ["/var/opt/bro/logs/current/loaded_scripts.log"] type => "bro" add_field => { "path" => "loaded_scripts" } }
file { path => ["/var/opt/bro/logs/current/notice.log"] type => "bro" add_field => { "path" => "notice" } }
file { path => ["/var/opt/bro/logs/current/packet_filter.log"] type => "bro" add_field => { "path" => "packet_filter" } }
file { path => ["/var/opt/bro/logs/current/reporter.log"] type => "bro" add_field => { "path" => "reporter" } }
file { path => ["/var/opt/bro/logs/current/snmp.log"] type => "bro" add_field => { "path" => "snmp" } }
file { path => ["/var/opt/bro/logs/current/software.log"] type => "bro" add_field => { "path" => "software" } }
file { path => ["/var/opt/bro/logs/current/ssh.log"] type => "bro" add_field => { "path" => "ssh" } }
file { path => ["/var/opt/bro/logs/current/ssl.log"] type => "bro" add_field => { "path" => "ssl" } }
file { path => ["/var/opt/bro/logs/current/weird.log"] type => "bro" add_field => { "path" => "weird" } }
file { path => ["/var/opt/bro/logs/current/x509.log"] type => "bro" add_field => { "path" => "x509" } }
}
filter {
if [type] == "bro" { bro { } }
}
output {
stdout {
codec => rubydebug
}
}
@dcode
Copy link
Author

dcode commented Nov 11, 2014

FYI, this won't parse properly until the file rolls over so logstash can read the headers. I'm gonna run it at home for a few days and see if there are any explosions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment