Skip to content

Instantly share code, notes, and snippets.

@gteissier
Created December 14, 2019 15:19
Show Gist options
  • Save gteissier/4e076b2645e1754c99c8278cd4a6a987 to your computer and use it in GitHub Desktop.
Save gteissier/4e076b2645e1754c99c8278cd4a6a987 to your computer and use it in GitHub Desktop.
Pythonic tcpdump: copy, paste, and enjoy
#!/usr/bin/env python
'''
It has been tested with either py2 or py3.
Beware ancient versions of Linux kernel which may not support SOCK_NONBLOCK
or the memory mapped ring buffer.
BPF filter listed below is compiled form of "not port 22"
if you want to change it, do something like
# tcpdump -dd "udp and port 53" | tr "{" "(" | tr "}" ")"
( 0x28, 0, 0, 0x0000000c ),
( 0x15, 0, 6, 0x000086dd ),
( 0x30, 0, 0, 0x00000014 ),
...
( 0x6, 0, 0, 0x00000000 ),
and then paste the output to the script, and use it as filter in the Sniffer constructor
some parameters could be tweaked. Default values are:
* ringbuffer framesize: 4096 (a page)
* ringbuffer frame number: 4096 (ring buffer is 16 MB big)
* poll timeout value: 500ms
'''
import os
import socket
import ctypes
import fcntl
from struct import pack, unpack
import sys
import mmap
import select
class Const(object):
ETH_P_ALL = 0x0003
ETH_P_IP = 0x0800
IFF_PROMISC = 0x100
SIOCGIFFLAGS = 0x8913
SIOCSIFFLAGS = 0x8914
SO_ATTACH_FILTER = 26
SOCK_NONBLOCK = 0x800
SOL_PACKET = 263
PACKET_RX_RING = 5
PACKET_HOST = 0 # To us
PACKET_BROADCAST = 1 # To all
PACKET_MULTICAST = 2 # To group
PACKET_OTHERHOST = 3 # To someone else
PACKET_OUTGOING = 4 # Outgoing
PACKET_USER = 6 # To userspace
PACKET_KERNEL = 7 # To kernel
PAGESIZE = 4096
TP_STATUS_KERNEL = 0
TP_STATUS_USER = 1
class tp_packet_req(ctypes.Structure):
_fields_ = [
('tp_block_size', ctypes.c_uint),
('tp_block_nr', ctypes.c_uint),
('tp_frame_size', ctypes.c_uint),
('tp_frame_nr', ctypes.c_uint),
]
class tpacket_hdr(ctypes.Structure):
_fields_ = [
('tp_status', ctypes.c_ulong),
('tp_len', ctypes.c_uint),
('tp_snaplen', ctypes.c_uint),
('tp_mac', ctypes.c_ushort),
('tp_net', ctypes.c_ushort),
('tp_sec', ctypes.c_uint),
('tp_usec', ctypes.c_uint),
]
def bpf_pack(x):
return pack('HBBI', x[0], x[1], x[2], x[3])
NotPort22 = [
( 0x28, 0, 0, 0x0000000c ),
( 0x15, 0, 8, 0x000086dd ),
( 0x30, 0, 0, 0x00000014 ),
( 0x15, 2, 0, 0x00000084 ),
( 0x15, 1, 0, 0x00000006 ),
( 0x15, 0, 17, 0x00000011 ),
( 0x28, 0, 0, 0x00000036 ),
( 0x15, 14, 0, 0x00000016 ),
( 0x28, 0, 0, 0x00000038 ),
( 0x15, 12, 13, 0x00000016 ),
( 0x15, 0, 12, 0x00000800 ),
( 0x30, 0, 0, 0x00000017 ),
( 0x15, 2, 0, 0x00000084 ),
( 0x15, 1, 0, 0x00000006 ),
( 0x15, 0, 8, 0x00000011 ),
( 0x28, 0, 0, 0x00000014 ),
( 0x45, 6, 0, 0x00001fff ),
( 0xb1, 0, 0, 0x0000000e ),
( 0x48, 0, 0, 0x0000000e ),
( 0x15, 2, 0, 0x00000016 ),
( 0x48, 0, 0, 0x00000010 ),
( 0x15, 0, 1, 0x00000016 ),
( 0x6, 0, 0, 0x00000000 ),
( 0x6, 0, 0, 0x00040000 ),
]
class Sniffer(object):
def __init__(self, nr_frames, filter=NotPort22):
# check number of frames is a power of 2
assert(nr_frames & (nr_frames-1) == 0)
s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW|Const.SOCK_NONBLOCK, socket.htons(Const.ETH_P_ALL))
assert(s is not None and s != -1)
# attach BPF filter
filter_content = b''
for elm in filter:
filter_content += bpf_pack(elm)
addr_filter = ctypes.create_string_buffer(filter_content)
fprog = pack('HL', len(filter), ctypes.addressof(addr_filter))
s.setsockopt(socket.SOL_SOCKET, Const.SO_ATTACH_FILTER, fprog)
# create packets ring buffer
tp = tp_packet_req()
tp.tp_block_size = nr_frames * Const.PAGESIZE
tp.tp_block_nr = 1
tp.tp_frame_size = Const.PAGESIZE
tp.tp_frame_nr = nr_frames
self.nr_frames = nr_frames
s.setsockopt(Const.SOL_PACKET, Const.PACKET_RX_RING, tp)
self.sock = s
# map packets ring buffer
self.ringbuffer = mmap.mmap(s.fileno(), tp.tp_frame_size*tp.tp_frame_nr,
mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE)
self.offset = 0
def recv_packets(self):
while True:
hdr = tpacket_hdr.from_buffer(self.ringbuffer, self.offset*Const.PAGESIZE)
if (hdr.tp_status & Const.TP_STATUS_USER) == 0:
break
pkt_offset = self.offset*Const.PAGESIZE + hdr.tp_mac
pkt_length = hdr.tp_snaplen
yield ((hdr.tp_sec, hdr.tp_usec), self.ringbuffer[pkt_offset:pkt_offset+pkt_length])
hdr.tp_status = Const.TP_STATUS_KERNEL
self.offset += 1
# should be a modulo, but we required to have a power of two
# in this case, &= (self.nr_frames - 1) is equivalent to %= self.nr_frames
self.offset &= (self.nr_frames - 1)
n_packets = 0
with open(sys.argv[1], 'wb') as f:
# libpcap file format, tcpdump 2.4
f.write(pack('!IHHIIII', 0xa1b2c3d4, 2, 4, 0, 0, 65536, 1))
s = Sniffer(nr_frames=4096)
poller = select.poll()
poller.register(s.sock, select.POLLIN)
while True:
events = poller.poll(500)
for (fd, evt) in events:
assert(fd == s.sock.fileno())
assert(evt == select.POLLIN)
for (ts, pkt) in s.recv_packets():
(tv_sec, tv_usec) = ts
f.write(pack('!IIII', tv_sec, tv_usec, len(pkt), len(pkt)))
f.write(pkt)
n_packets += 1
f.flush()
sys.stdout.write('\r captured %06d packets' % n_packets)
sys.stdout.flush()
@janith989
Copy link

janith989 commented Sep 3, 2020

is there a way to bypass kernel in python?
I know it is not related to this... but,just asking...
Ex:If raw dns query packet is sent using PF_PACKET sockets(really,ethernet frame),after receiving the dns response,kernel will send port unreachable icmp packet to dns server...iptables can prevent that...but,is there an any other solution for that....?

@gteissier
Copy link
Author

A rough idea, not tested.

BPF was originally designed to express packet filtering decisions. It has been extended into eBPF, and now further evolved into XDP, for eXpress DataPath.
An XDP program will run directly in the kernel, and would qualify to filter and possibly emit frames. I believe you can write both emit and drop logic in an XDP program. You will have to bind it to a specific NIC to make it run.

@janith989
Copy link

Thanks for replying ... I will try...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment