Skip to content

Instantly share code, notes, and snippets.

@nithu0115
Created March 12, 2021 21:00
Show Gist options
  • Save nithu0115/72d238792bd42c413212ca900f7d2290 to your computer and use it in GitHub Desktop.
Save nithu0115/72d238792bd42c413212ca900f7d2290 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# coding: utf-8
import sys
from socket import inet_ntop, AF_INET, AF_INET6
from bcc import BPF
import ctypes as ct
import subprocess
import struct
from struct import pack
import socket
import argparse
from prettytable import PrettyTable
class storeindict(argparse.Action):
def __call__( self , parser, namespace,
values, option_string = None):
setattr(namespace, self.dest, set())
for value in values:
getattr(namespace, self.dest).add(value)
parser = argparse.ArgumentParser(
description="Parsing pod traffic")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-src", "--ipaddr", type=str,
help="src ip address")
parser.add_argument("-c", "--count", type=int, default=10,
help="count of capture")
parser.add_argument("-dest", "--destipaddr", type=str,
help="dest ip address")
group.add_argument("-src_list", '--list', type=str,
nargs='*',
action = storeindict)
args = parser.parse_args()
ipaddr=(struct.unpack("I",socket.inet_aton("0" if args.ipaddr == None else args.ipaddr))[0])
dest_ipaddr=(struct.unpack("I",socket.inet_aton("0" if args.destipaddr == None else args.destipaddr))[0])
capture_count = 0
# uapi/linux/netfilter.h
NF_VERDICT_NAME = [
"DROP",
"ACCEPT",
"STOLEN",
"QUEUE",
"REPEAT",
"STOP",
]
# uapi/linux/netfilter.h
# net/ipv4/netfilter/ip_tables.c
HOOKNAMES = [
"PREROUTING",
"INPUT",
"FORWARD",
"OUTPUT",
"POSTROUTING",
]
RT_RESULT = [
"FIB HIT",
"FIB MISS",
]
# uapi/linux/rtnetlink.h
RT_RESULT_TYPE = [
"RTN_UNSPEC",
"RTN_UNICAST(Gateway or direct route)",
"RTN_LOCAL(Accept locally)",
"RTN_BROADCAST(Accept locally as bcast,send as bcast)",
"RTN_ANYCAST(Accept locally as bcast,send as ucast)",
"RTN_MULTICAST(Mcast route)",
"RTN_BLACKHOLE(Drop)",
"RTN_UNREACHABLE(Destination is unreachable)",
"RTN_PROHIBIT(Adminstratively prohibited)",
"RTN_THROW(Not in this table)",
"RTN_NAT(Translate this address)",
"RTN_XRESOLVE(Use external resolver)",
"RTN_UNKOWN(MAX_reached - Check header is new type is added)",
]
ROUTE_EVT_IF = 1
ROUTE_EVT_IPTABLE = 2
ROUTE_EVT_RTTABLE = 4
def parse_result(rslt_table, index, default):
if index < len(rslt_table):
return rslt_table[index]
return default
def event_printer(cpu, data, size):
event = b["route_evt"].event(data)
if event.flags & ROUTE_EVT_IF != ROUTE_EVT_IF:
return
if event.ip_version == 4:
saddr = inet_ntop(AF_INET, pack("=I", event.saddr[0]))
daddr = inet_ntop(AF_INET, pack("=I", event.daddr[0]))
elif event.ip_version == 6:
saddr = inet_ntop(AF_INET6, event.saddr)
daddr = inet_ntop(AF_INET6, event.daddr)
else:
return
if ipaddr != 0 and event.saddr[0] != ipaddr:
return
if (args.list is not None) and (saddr not in args.list):
return
if dest_ipaddr != 0 and event.daddr[0] != dest_ipaddr:
return
flow = "%s -> %s" % (saddr, daddr)
result = ""
if event.flags & ROUTE_EVT_IPTABLE == ROUTE_EVT_IPTABLE:
verdict = parse_result(NF_VERDICT_NAME, event.verdict, "~UNK~")
hook = parse_result(HOOKNAMES, event.hook, "~UNK~")
result = "%s.%s is %s" % (event.tablename, hook, verdict)
if event.flags & ROUTE_EVT_RTTABLE == ROUTE_EVT_RTTABLE:
verdict = parse_result(RT_RESULT, event.verdict, "~UNK~")
hook = parse_result(RT_RESULT_TYPE, event.fib_res_type, "~UNK~")
result = "%s/%s is %s [%s]" % (inet_ntop(AF_INET, pack("=I",event.fib_res_addr[0])), event.fib_res_prefixlen, verdict, hook)
t.add_row([event.netns, event.ifname, flow, result])
args.count = args.count - 1
if args.count <= 0:
print(t)
sys.exit(0)
if __name__ == "__main__":
t = PrettyTable(['Namespace', 'ENI', 'SRC -> DEST', 'LOOKUP RSLT'])
b = BPF(src_file='capturetraffic.c')
b["route_evt"].open_perf_buffer(event_printer)
while 1:
try:
b.perf_buffer_poll()
except KeyboardInterrupt:
print(t)
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment