Last active
August 29, 2015 14:08
-
-
Save dcode/60ac10d8f584a17dc77f to your computer and use it in GitHub Desktop.
Bro filter for LogStash
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 # *NOTE*: I have no idea what I'm doing and this is untested. Use at your own risk | |
# (though I welcome assistance) | |
require "logstash/filters/base" | |
require "logstash/namespace" | |
require "csv" | |
require "bigdecimal" | |
# The Bro filter takes an event field containing Bro log data, parses it, | |
# and stores it as individual fields with the names parsed from the header. | |
class LogStash::Filters::Bro < LogStash::Filters::Base | |
config_name "bro" | |
milestone 1 | |
# The CSV data in the value of the `source` field will be expanded into a | |
# data structure. | |
config :source, :validate => :string, :default => "message" | |
# Define a list of column names (in the order they appear in the CSV, | |
# as if it were a header line). If `columns` is not configured, or there | |
# are not enough columns specified, the default column names are | |
# "column1", "column2", etc. In the case that there are more columns | |
# in the data than specified in this column list, extra columns will be auto-numbered: | |
# (e.g. "user_defined_1", "user_defined_2", "column3", "column4", etc.) | |
#config :columns, :validate => :array, :default => [] | |
# Define the column separator value. If this is not specified, the default | |
# is a tab ' '. | |
# Optional. | |
config :separator, :validate => :string, :default => ' ' | |
# Define the set separator value. If this is not specified, the default | |
# is a comma ','. | |
# Optional. | |
config :set_separator, :validate => :string, :default => ',' | |
# Define target field for placing the data. | |
# Defaults to writing to the root of the event. | |
config :target, :validate => :string | |
public | |
def register | |
@header_done = false | |
@meta = { "path" => {} } | |
end # def register | |
public | |
def filter(event) | |
return unless filter?(event) | |
@logger.debug? and @logger.debug("Running bro filter", :event => event) | |
matches = 0 | |
if !event.include?("path") | |
@logger.error("The bro filter requires a \"path\" field added in the input section of the logstash config!") | |
else | |
path_ = event["path"] | |
if !@meta.has_key?(path_) | |
@meta[path_] = {} | |
end | |
end | |
if event[@source] | |
if event[@source].is_a?(String) | |
event[@source] = [event[@source]] | |
end | |
if event[@source].length > 1 | |
@logger.warn("bro filter only works on fields of length 1", | |
:source => @source, :value => event[@source], | |
:event => event) | |
return | |
end | |
raw = event[@source].first | |
if @meta[path_]["header_done"] == false or raw.start_with?("#separator") | |
if raw.start_with?("#separator") | |
@meta[path_]["header_done"] = false # This will reparse the header if we encounter a new one | |
@meta[path_]["separator"] = raw.partition(/\s/)[2] | |
elsif raw.start_with?("#set_separator") | |
sep = @meta[path_]["separator"] | |
@meta[path_]["set_separator"] = raw.partition(/#{sep}/)[2] | |
elsif raw.start_with?("#empty_field") | |
sep = @meta[path_]["separator"] | |
@meta[path_]["empty_field"] = raw.partition(/#{sep}/)[2] | |
elsif raw.start_with?("#unset_field") | |
sep = @meta[path_]["separator"] | |
@meta[path_]["unset_field"] = raw.partition(/#{sep}/)[2] | |
elsif raw.start_with?("#path") | |
sep = @meta[path_]["separator"] | |
@meta[path_]["path"] = raw.partition(/#{sep}/)[2] | |
elsif raw.start_with?("#fields") | |
sep = @meta[path_]["separator"] | |
@meta[path_]["columns"] = raw.gsub(/\./, "_").partition(/#{sep}/)[2].split(/#{sep}/) | |
elsif raw.start_with?("#types") | |
sep = @meta[path_]["separator"] | |
@meta[path_]["types"] = raw.partition(/#{sep}/)[2].split(/#{sep}/) | |
# Map the Bro types to ES types | |
@meta[path_]["types"].each_index do |i| | |
case @meta[path_]["types"][i] | |
when "count" | |
@meta[path_]["types"][i] = "int" | |
when "double" | |
@meta[path_]["types"][i] = "float" | |
when "interval" | |
@meta[path_]["types"][i] = "float" | |
when "time" | |
@meta[path_]["types"][i] = "time" | |
else | |
@meta[path_]["types"][i] = "string" | |
end | |
end | |
@meta[path_]["header_done"] = true | |
end | |
if @logger.info? and @meta[path_]["header_done"] == true | |
@logger.info("separator: \"#{@meta[path_]["separator"]}\"") | |
@logger.info("path: \"#{path_}\"") | |
@logger.info("columns: \"#{@meta[path_]["columns"]}\"") | |
@logger.info("types: \"#{@meta[path_]["types"]}\"") | |
end | |
event.cancel | |
return | |
end # End header_done == false | |
begin | |
sep = @meta[path_]["separator"] | |
#values = CSV.parse_line(raw, :col_sep => sep) | |
values = raw.split(/#{sep}/) | |
if @target.nil? | |
# Default is to write to the root of the event. | |
dest = event | |
else | |
dest = event[@target] ||= {} | |
end | |
cols = @meta[path_]["columns"] | |
types = @meta[path_]["types"] | |
values.each_index do |i| | |
field_name = cols[i] || "column#{i+1}" | |
field_type = types[i] || "string" | |
case field_type | |
when "int" | |
values[i] = values[i].to_i | |
when "float" | |
values[i] = values[i].to_f | |
when "time" # Create an actual timestamp | |
# Truncate timestamp to millisecond precision | |
secs = BigDecimal.new(values[i]) | |
dest["#{field_name}_raw"] = secs | |
msec = secs * 1000 # convert to whole number of milliseconds | |
msec = msec.to_i | |
dest["#{field_name}_usec"] = (secs * 10**6) % 1000 | |
values[i] = Time.at(msec / 1000, (msec % 1000) * 1000) | |
end | |
dest[field_name] = values[i] | |
end | |
# Add some additional data | |
dest["@timestamp"] = dest["ts"] | |
dest["ts_end"] = dest["ts"] + dest["duration"] if not dest["duration"].nil? | |
dest["path"] = @meta[path_]["path"] | |
filter_matched(event) | |
rescue => e | |
event.tag "_broparsefailure" | |
@logger.warn("Trouble parsing bro", :source => @source, :raw => raw, | |
:exception => e) | |
return | |
end # begin | |
end # if event | |
@logger.debug("Event after bro filter", :event => event) | |
end # def filter | |
end # class LogStash::Filters::Bro |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
input { | |
file { path => ["/var/opt/bro/logs/current/app_stats.log"] type => "bro" add_field => { "path" => "app_stats" } } | |
file { path => ["/var/opt/bro/logs/current/conn.log"] type => "bro" add_field => { "path" => "conn" } } | |
file { path => ["/var/opt/bro/logs/current/dhcp.log"] type => "bro" add_field => { "path" => "dhcp" } } | |
file { path => ["/var/opt/bro/logs/current/dns.log"] type => "bro" add_field => { "path" => "dns" } } | |
file { path => ["/var/opt/bro/logs/current/dpd.log"] type => "bro" add_field => { "path" => "dpd" } } | |
file { path => ["/var/opt/bro/logs/current/files.log"] type => "bro" add_field => { "path" => "files" } } | |
file { path => ["/var/opt/bro/logs/current/http.log"] type => "bro" add_field => { "path" => "http" } } | |
file { path => ["/var/opt/bro/logs/current/known_hosts.log"] type => "bro" add_field => { "path" => "known_hosts" } } | |
file { path => ["/var/opt/bro/logs/current/known_services.log"] type => "bro" add_field => { "path" => "known_services" } } | |
file { path => ["/var/opt/bro/logs/current/loaded_scripts.log"] type => "bro" add_field => { "path" => "loaded_scripts" } } | |
file { path => ["/var/opt/bro/logs/current/notice.log"] type => "bro" add_field => { "path" => "notice" } } | |
file { path => ["/var/opt/bro/logs/current/packet_filter.log"] type => "bro" add_field => { "path" => "packet_filter" } } | |
file { path => ["/var/opt/bro/logs/current/reporter.log"] type => "bro" add_field => { "path" => "reporter" } } | |
file { path => ["/var/opt/bro/logs/current/snmp.log"] type => "bro" add_field => { "path" => "snmp" } } | |
file { path => ["/var/opt/bro/logs/current/software.log"] type => "bro" add_field => { "path" => "software" } } | |
file { path => ["/var/opt/bro/logs/current/ssh.log"] type => "bro" add_field => { "path" => "ssh" } } | |
file { path => ["/var/opt/bro/logs/current/ssl.log"] type => "bro" add_field => { "path" => "ssl" } } | |
file { path => ["/var/opt/bro/logs/current/weird.log"] type => "bro" add_field => { "path" => "weird" } } | |
file { path => ["/var/opt/bro/logs/current/x509.log"] type => "bro" add_field => { "path" => "x509" } } | |
} | |
filter { | |
if [type] == "bro" { bro { } } | |
} | |
output { | |
stdout { | |
codec => rubydebug | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
FYI, this won't parse properly until the file rolls over so logstash can read the headers. I'm gonna run it at home for a few days and see if there are any explosions.