Skip to content

Instantly share code, notes, and snippets.

@whiskeyalpharomeo
Last active April 17, 2020 22:27
Show Gist options
  • Save whiskeyalpharomeo/92538521bb631dd2db38 to your computer and use it in GitHub Desktop.
Save whiskeyalpharomeo/92538521bb631dd2db38 to your computer and use it in GitHub Desktop.
Logstash Filter for Processing sFlow FLOW records
#################
# Sflow Filters #
#################
filter {
if [type] == "sflow" {
# sFlow sends two kinds of messages - CNTRs and FLOWs
# I'm not doing anything with CNTRs at this point, so
# I drop those, and we concentrate on processing FLOW
# messages.
if [message] =~ /CNTR/ {
drop { }
}
# sFlow FLOW messages break down into the following fields.
# I have named them arbitrarily, but they correspond to the
# actual field names. I developed this grok pattern using
# two tools:
#
# - sflowtool (http://www.inmon.com/technology/sflowTools.php)
# Written by InMon, it provides a way to examine sFlow messages
# in human readable format.
#
# - Grok Debugger (https://grokdebug.herokuapp.com/)
# Invaluable, and self-explanatory.
grok {
match => { "message" => "%{WORD:SampleType},%{IP:sflow.ReporterIP},%{WORD:sflow.inputPort},%{WORD:sflow.outputPort},%{WORD:sflow.srcMAC},%{WORD:sflow.dstMAC},%{WORD:sflow.EtherType},%{NUMBER:sflow.in_vlan},%{NUMBER:sflow.out_vlan},%{IP:sflow.srcIP},%{IP:sflow.dstIP},%{NUMBER:sflow.IPProtocol},%{WORD:sflow.IPTOS},%{WORD:sflow.IPTTL},%{NUMBER:sflow.srcPort},%{NUMBER:sflow.dstPort},%{DATA:sflow.tcpFlags},%{NUMBER:sflow.PacketSize},%{NUMBER:sflow.IPSize},%{NUMBER:sflow.SampleRate}" }
}
# Sometimes it doesn't work out.
if "_grokparsefailure" in [tags] {
drop { }
}
# Because I'll ultimately be displaying all of this
# in Kibana, I want to translate many of the IP addresses
# into recognizable hostnames. We take the IP fields,
# and copy them into new fields that we'll be doing DNS
# lookups on:
mutate {
add_field => {
"[sflow.SrcHostname]" => "%{sflow.srcIP}"
"[sflow.DstHostname]" => "%{sflow.dstIP}"
"[sflow.ReporterName]" => "%{sflow.ReporterIP}"
}
}
# I also want to translate things like Source and
# Destination port numbers into known service names.
# for this to work, you have to built some YAML files,
# which basically map everything you'd find in an
# /etc/services file. I'll post my YAML files in
# other Gists.
translate {
field => "[sflow.srcPort]"
destination => "[sflow.SrcSvcName]"
dictionary_path => "/etc/logstash/dictionaries/iana_services.yaml"
}
translate {
field => "[sflow.dstPort]"
destination => "[sflow.DstSvcName]"
dictionary_path => "/etc/logstash/dictionaries/iana_services.yaml"
}
translate {
field => "[sflow.IPProtocol]"
destination => "[sflow.ProtName]"
dictionary_path => "/etc/logstash/dictionaries/iana_protocols.yaml"
}
translate {
field => "[sflow.tcpFlags]"
destination => "[sflow.TCPFlagDecode]"
dictionary_path => "/etc/logstash/dictionaries/tcp_flags.yaml"
}
# Here we do our DNS reverse lookups:
dns {
reverse => [ "sflow.SrcHostname" ]
action => "replace"
}
dns {
reverse => [ "sflow.DstHostname" ]
action => "replace"
}
dns {
reverse => [ "sflow.ReporterName" ]
action => "replace"
}
# If the reverse lookup didn't fail (leaving us with
# nothing but numerics) we grok the FQDN, parsing it
# into a HOST and DOMAIN - and then we store the
# HOST portion into a new field - sflow.SrcHost
#
# Otherwise, we just dump the unresolved IP address
# into the sflow.SrcHostname field.
if [sflow.SrcHostname] !~ /(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/ {
grok {
match => [ "[sflow.SrcHostname]", "%{DATA:src_host}\.%{GREEDYDATA:domain}" ]
add_field => { "[sflow.SrcHost]" => "%{src_host}" }
}
} else {
mutate {
add_field => [ "[sflow.SrcHost]", "%{[sflow.SrcHostname}" ]
}
}
if [sflow.DstHostname] !~ /(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/ {
grok {
match => [ "[sflow.DstHostname]", "%{DATA:dst_host}\.%{GREEDYDATA:domain}" ]
add_field => { "[sflow.DstHost]" => "%{dst_host}" }
}
} else {
mutate {
add_field => [ "[sflow.DstHost]", "%{[sflow.DstHostname}" ]
}
}
if [sflow.ReporterName] !~ /(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/ {
grok {
match => [ "[sflow.ReporterName]", "%{DATA:rep_host}(\.|\-)" ]
add_field => { "[sflow.Reporter" => "%{rep_host}" }
}
} else {
mutate {
add_field => [ "[sflow.Reporter]", "%{[sflow.ReporterName}" ]
}
}
# There are a bunch of fields from the original
# message, as well as some I created temporarily,
# that I don't care to store. Here's where I drop
# them all.
mutate {
remove_field => [ "host", "command", "sflow.inputPort", "sflow.outputPort", "sflow.srcMAC", "sflow.dstMAC", "sflow.EtherType", "sflow.in_vlan", "sflow.out_vlan", "sflow.IPTTL", "sflow.IPSize", "message", "src_host", "dst_host", "rep_host", "SampleType", "domain" ]
}
# Lastly, we're going to want Kibana and
# Elasticsearch to be able to do some math
# with certain fields, so make sure they're
# handled as numbers, rather than strings.
# You can also do this by setting up templates
# in Elasticsearch, but this is easier.
mutate {
convert => {
"sflow.PacketSize" => "integer"
"sflow.SampleRate" => "integer"
}
}
}
}
@ruddj
Copy link

ruddj commented Sep 10, 2015

Just a suggestion, rather than needing to convert to int you can include it at the grok step by adding a ":int" after fieldname. e.g.
%{NUMBER:sflow.PacketSize:int},%{NUMBER:sflow.IPSize:int},%{NUMBER:sflow.SampleRate:int}

Could also be used with srcPort and dstPort.
Full example:

match => { "message" => "%{WORD:SampleType},%{IP:sflow.ReporterIP},%{WORD:sflow.inputPort},%{WORD:sflow.outputPort},%{WORD:sflow.srcMAC},%{WORD:sflow.dstMAC},%{WORD:sflow.EtherType},%{NUMBER:sflow.in_vlan:int},%{NUMBER:sflow.out_vlan:int},%{IP:sflow.srcIP},%{IP:sflow.dstIP},%{NUMBER:sflow.IPProtocol:int},%{WORD:sflow.IPTOS},%{WORD:sflow.IPTTL},%{NUMBER:sflow.srcPort:int},%{NUMBER:sflow.dstPort:int},%{DATA:sflow.tcpFlags},%{NUMBER:sflow.PacketSize:int},%{NUMBER:sflow.IPSize:int},%{NUMBER:sflow.SampleRate:int}" }

@whiskeyalpharomeo
Copy link
Author

Good advice. I'll incorporate that in the future.

@Ay4shi
Copy link

Ay4shi commented Jul 8, 2016

Sometimes the flows are inverted (destination <-> source)

Would it be possible to automatically swap the fields when dstPort > 10000 ?

@bananabr
Copy link

@Ay4shi, Using dstPort > 10000 as an inversion parameter would not always work. If you normalizing TCP conversations, try using their flags instead.

@anshul27jain
Copy link

Anyone please help me to install Logstash Plugin logstash-codec-sflow.
In latest verison of Logstash, they are not providing the same.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment