Skip to content

Instantly share code, notes, and snippets.

@JohannesFKnauf
Created July 26, 2019 07:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JohannesFKnauf/79bae1a539e9e187308f2afc36412d94 to your computer and use it in GitHub Desktop.
Save JohannesFKnauf/79bae1a539e9e187308f2afc36412d94 to your computer and use it in GitHub Desktop.
Analyse connection-level usage of a host
#!/usr/bin/python3
#
# The purpose of this snippet is preprocessing log files generated by a
# tcpdump -i ... -nn -q -l -tttt > sample.tcpdump.log
#
# in order to perform a connection-level analysis:
# Which remote host (as identified by IP) was communicating with us,
# over what transport protocol (UDP, tcp) and
# how often (at a granularity of days)?
#
# E.g. for counting daily statistics, you can use
# cat sample.tcpdump.log | python3 analyse_tcpdump.py | sort | uniq -c
import itertools
import operator
import re
import sys
tcpdump_logline_pattern = re.compile(r"""
^ # start of logline
(?P<date>(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})) # date group yyyy-MM-dd (man tcpdump(8))
\s #
(?P<time>(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\.(?P<second_fraction>\d{6})) # time group hh:mm:ss.frac
\s #
(?P<internet_protocol>IP) # e.g. IP or ARP, in our case only IP is expected
\s #
(?P<source_ip>\d+\.\d+\.\d+\.\d+)\.(?P<source_port>\d+) # e.g. 127.0.0.1.12345
\s>\s # >
(?P<destination_ip>\d+\.\d+\.\d+\.\d+)\.(?P<destination_port>\d+) # e.g. 127.0.0.1.12345
:\s # :
(?P<transport_protocol>[a-zA-Z]+) # e.g. tcp or UDP
,?\s # optional ,
(?P<supplementary_information>.*) # protocol specific; cf. man tcpdump(8) in case we need it
$ # end of logline
""", re.VERBOSE)
def parse_logline(raw_logline):
match = tcpdump_logline_pattern.match(logline)
if match is None:
print("PANIC! Log line did not match: " + logline)
else:
parsed_line = dict(match.groupdict())
logged_connection = {"date": parsed_line["date"],
"time": parsed_line["time"],
"source_ip": parsed_line["source_ip"],
"source_port": parsed_line["source_port"],
"destination_ip": parsed_line["destination_ip"],
"destination_port": parsed_line["destination_port"],
"transport_protocol": parsed_line["transport_protocol"].lower()
}
return logged_connection
if __name__ == "__main__":
for logline in sys.stdin:
logged_connection = parse_logline(logline)
# Drop time
# Output source and destination as individual events
print("{date} {source_ip}:{source_port} {transport_protocol}".format(**logged_connection))
print("{date} {destination_ip}:{destination_port} {transport_protocol}".format(**logged_connection))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment