Skip to content

Instantly share code, notes, and snippets.

@gteissier
Created December 14, 2019 15:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save gteissier/4e076b2645e1754c99c8278cd4a6a987 to your computer and use it in GitHub Desktop.
Save gteissier/4e076b2645e1754c99c8278cd4a6a987 to your computer and use it in GitHub Desktop.
Pythonic tcpdump: copy, paste, and enjoy
#!/usr/bin/env python
'''
It has been tested with either py2 or py3.
Beware ancient versions of Linux kernel which may not support SOCK_NONBLOCK
or the memory mapped ring buffer.
BPF filter listed below is compiled form of "not port 22"
if you want to change it, do something like
# tcpdump -dd "udp and port 53" | tr "{" "(" | tr "}" ")"
( 0x28, 0, 0, 0x0000000c ),
( 0x15, 0, 6, 0x000086dd ),
( 0x30, 0, 0, 0x00000014 ),
...
( 0x6, 0, 0, 0x00000000 ),
and then paste the output to the script, and use it as filter in the Sniffer constructor
some parameters could be tweaked. Default values are:
* ringbuffer framesize: 4096 (a page)
* ringbuffer frame number: 4096 (ring buffer is 16 MB big)
* poll timeout value: 500ms
'''
import os
import socket
import ctypes
import fcntl
from struct import pack, unpack
import sys
import mmap
import select
class Const(object):
ETH_P_ALL = 0x0003
ETH_P_IP = 0x0800
IFF_PROMISC = 0x100
SIOCGIFFLAGS = 0x8913
SIOCSIFFLAGS = 0x8914
SO_ATTACH_FILTER = 26
SOCK_NONBLOCK = 0x800
SOL_PACKET = 263
PACKET_RX_RING = 5
PACKET_HOST = 0 # To us
PACKET_BROADCAST = 1 # To all
PACKET_MULTICAST = 2 # To group
PACKET_OTHERHOST = 3 # To someone else
PACKET_OUTGOING = 4 # Outgoing
PACKET_USER = 6 # To userspace
PACKET_KERNEL = 7 # To kernel
PAGESIZE = 4096
TP_STATUS_KERNEL = 0
TP_STATUS_USER = 1
class tp_packet_req(ctypes.Structure):
_fields_ = [
('tp_block_size', ctypes.c_uint),
('tp_block_nr', ctypes.c_uint),
('tp_frame_size', ctypes.c_uint),
('tp_frame_nr', ctypes.c_uint),
]
class tpacket_hdr(ctypes.Structure):
_fields_ = [
('tp_status', ctypes.c_ulong),
('tp_len', ctypes.c_uint),
('tp_snaplen', ctypes.c_uint),
('tp_mac', ctypes.c_ushort),
('tp_net', ctypes.c_ushort),
('tp_sec', ctypes.c_uint),
('tp_usec', ctypes.c_uint),
]
def bpf_pack(x):
return pack('HBBI', x[0], x[1], x[2], x[3])
NotPort22 = [
( 0x28, 0, 0, 0x0000000c ),
( 0x15, 0, 8, 0x000086dd ),
( 0x30, 0, 0, 0x00000014 ),
( 0x15, 2, 0, 0x00000084 ),
( 0x15, 1, 0, 0x00000006 ),
( 0x15, 0, 17, 0x00000011 ),
( 0x28, 0, 0, 0x00000036 ),
( 0x15, 14, 0, 0x00000016 ),
( 0x28, 0, 0, 0x00000038 ),
( 0x15, 12, 13, 0x00000016 ),
( 0x15, 0, 12, 0x00000800 ),
( 0x30, 0, 0, 0x00000017 ),
( 0x15, 2, 0, 0x00000084 ),
( 0x15, 1, 0, 0x00000006 ),
( 0x15, 0, 8, 0x00000011 ),
( 0x28, 0, 0, 0x00000014 ),
( 0x45, 6, 0, 0x00001fff ),
( 0xb1, 0, 0, 0x0000000e ),
( 0x48, 0, 0, 0x0000000e ),
( 0x15, 2, 0, 0x00000016 ),
( 0x48, 0, 0, 0x00000010 ),
( 0x15, 0, 1, 0x00000016 ),
( 0x6, 0, 0, 0x00000000 ),
( 0x6, 0, 0, 0x00040000 ),
]
class Sniffer(object):
def __init__(self, nr_frames, filter=NotPort22):
# check number of frames is a power of 2
assert(nr_frames & (nr_frames-1) == 0)
s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW|Const.SOCK_NONBLOCK, socket.htons(Const.ETH_P_ALL))
assert(s is not None and s != -1)
# attach BPF filter
filter_content = b''
for elm in filter:
filter_content += bpf_pack(elm)
addr_filter = ctypes.create_string_buffer(filter_content)
fprog = pack('HL', len(filter), ctypes.addressof(addr_filter))
s.setsockopt(socket.SOL_SOCKET, Const.SO_ATTACH_FILTER, fprog)
# create packets ring buffer
tp = tp_packet_req()
tp.tp_block_size = nr_frames * Const.PAGESIZE
tp.tp_block_nr = 1
tp.tp_frame_size = Const.PAGESIZE
tp.tp_frame_nr = nr_frames
self.nr_frames = nr_frames
s.setsockopt(Const.SOL_PACKET, Const.PACKET_RX_RING, tp)
self.sock = s
# map packets ring buffer
self.ringbuffer = mmap.mmap(s.fileno(), tp.tp_frame_size*tp.tp_frame_nr,
mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE)
self.offset = 0
def recv_packets(self):
while True:
hdr = tpacket_hdr.from_buffer(self.ringbuffer, self.offset*Const.PAGESIZE)
if (hdr.tp_status & Const.TP_STATUS_USER) == 0:
break
pkt_offset = self.offset*Const.PAGESIZE + hdr.tp_mac
pkt_length = hdr.tp_snaplen
yield ((hdr.tp_sec, hdr.tp_usec), self.ringbuffer[pkt_offset:pkt_offset+pkt_length])
hdr.tp_status = Const.TP_STATUS_KERNEL
self.offset += 1
# should be a modulo, but we required to have a power of two
# in this case, &= (self.nr_frames - 1) is equivalent to %= self.nr_frames
self.offset &= (self.nr_frames - 1)
n_packets = 0
with open(sys.argv[1], 'wb') as f:
# libpcap file format, tcpdump 2.4
f.write(pack('!IHHIIII', 0xa1b2c3d4, 2, 4, 0, 0, 65536, 1))
s = Sniffer(nr_frames=4096)
poller = select.poll()
poller.register(s.sock, select.POLLIN)
while True:
events = poller.poll(500)
for (fd, evt) in events:
assert(fd == s.sock.fileno())
assert(evt == select.POLLIN)
for (ts, pkt) in s.recv_packets():
(tv_sec, tv_usec) = ts
f.write(pack('!IIII', tv_sec, tv_usec, len(pkt), len(pkt)))
f.write(pkt)
n_packets += 1
f.flush()
sys.stdout.write('\r captured %06d packets' % n_packets)
sys.stdout.flush()
@janith989
Copy link

Thanks for replying ... I will try...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment