Skip to content

Instantly share code, notes, and snippets.

@nithu0115
Created March 12, 2021 17:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nithu0115/5035f33b1298d0a358e9b3e4379b0967 to your computer and use it in GitHub Desktop.
Save nithu0115/5035f33b1298d0a358e9b3e4379b0967 to your computer and use it in GitHub Desktop.
#/usr/bin/env python
# coding: utf-8
import sys
from socket import inet_ntop, AF_INET, AF_INET6
from bcc import BPF
import ctypes as ct
import subprocess
import struct
from struct import pack
import socket
import argparse
from prettytable import PrettyTable
class storeindict(argparse.Action):
# Constructor calling
def __call__( self , parser, namespace,
values, option_string = None):
setattr(namespace, self.dest, set())
for value in values:
getattr(namespace, self.dest).add(value)
parser = argparse.ArgumentParser(
description="Parsing pod traffic")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-src", "--ipaddr", type=str,
help="src ip address")
parser.add_argument("-c", "--count", type=int, default=10,
help="count of capture")
parser.add_argument("-dest", "--destipaddr", type=str,
help="dest ip address")
group.add_argument("-src_list", '--list', type=str,
nargs='*',
action = storeindict)
group.add_argument("-src_dest", '--s_d_list', type=str,
nargs='*',
action = storeindict)
args = parser.parse_args()
ipaddr=(struct.unpack("I",socket.inet_aton("0" if args.ipaddr == None else args.ipaddr))[0])
dest_ipaddr=(struct.unpack("I",socket.inet_aton("0" if args.destipaddr == None else args.destipaddr))[0])
capture_count = 0
TCPFLAGS = [
"CWR",
"ECE",
"URG",
"ACK",
"PSH",
"RST",
"SYN",
"FIN",
]
ROUTE_EVT_IF = 1
ROUTE_EVT_IPTABLE = 2
ROUTE_EVT_RTTABLE = 4
ROUTE_EVENT_DROP = 16
def _get_tcpflags(tcpflags):
flag=""
start=1
for index in range(len(TCPFLAGS)):
if (tcpflags & (1<<index)):
if start:
flag += TCPFLAGS[index]
start = 0
else:
flag += ","+TCPFLAGS[index]
return flag
def print_stack(event):
print("Func name ---> %s" % event.func_name)
for addr in stack_traces.walk(event.kernel_stack_id):
sym = b.ksym(addr, show_offset=True)
print(" %s" % sym)
print("\n")
def parse_result(rslt_table, index, default):
if index < len(rslt_table):
return rslt_table[index]
return default
def event_printer(cpu, data, size):
event = b["route_evt"].event(data)
if event.flags & ROUTE_EVT_IF != ROUTE_EVT_IF:
return
if event.ip_version == 4:
saddr = inet_ntop(AF_INET, pack("=I", event.saddr[0]))
daddr = inet_ntop(AF_INET, pack("=I", event.daddr[0]))
elif event.ip_version == 6:
saddr = inet_ntop(AF_INET6, event.saddr)
daddr = inet_ntop(AF_INET6, event.daddr)
else:
return
if ipaddr != 0 and event.saddr[0] != ipaddr:
return
if (args.list is not None) and (saddr not in args.list):
return
if dest_ipaddr != 0 and event.daddr[0] != dest_ipaddr:
return
if (args.s_d_list is not None):
if ((saddr in args.s_d_list) and (daddr in args.s_d_list)):
flag = False
else:
flag = True
if (flag == True):
return
flow = "%s -> %s" % (saddr, daddr)
result = ""
if event.flags & ROUTE_EVENT_DROP == ROUTE_EVENT_DROP:
if event.l4_proto == socket.IPPROTO_TCP:
print "T_%s:%s->%s" % (_get_tcpflags(event.tcpflags), saddr,daddr)
elif event.l4_proto == socket.IPPROTO_UDP:
print "U:%s->%s" % (saddr, daddr)
else:
print flow
print_stack(event)
args.count = args.count - 1
if args.count <= 0:
sys.exit(0)
if __name__ == "__main__":
b = BPF(src_file='call_stack.c')
stack_traces = b.get_table("stacks")
print("Hit Ctrl-C to end....")
b["route_evt"].open_perf_buffer(event_printer)
while 1:
try:
b.perf_buffer_poll()
except KeyboardInterrupt:
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment