Created
December 14, 2019 15:19
-
-
Save gteissier/4e076b2645e1754c99c8278cd4a6a987 to your computer and use it in GitHub Desktop.
Pythonic tcpdump: copy, paste, and enjoy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
It has been tested with either py2 or py3. | |
Beware ancient versions of Linux kernel which may not support SOCK_NONBLOCK | |
or the memory mapped ring buffer. | |
BPF filter listed below is compiled form of "not port 22" | |
if you want to change it, do something like | |
# tcpdump -dd "udp and port 53" | tr "{" "(" | tr "}" ")" | |
( 0x28, 0, 0, 0x0000000c ), | |
( 0x15, 0, 6, 0x000086dd ), | |
( 0x30, 0, 0, 0x00000014 ), | |
... | |
( 0x6, 0, 0, 0x00000000 ), | |
and then paste the output to the script, and use it as filter in the Sniffer constructor | |
some parameters could be tweaked. Default values are: | |
* ringbuffer framesize: 4096 (a page) | |
* ringbuffer frame number: 4096 (ring buffer is 16 MB big) | |
* poll timeout value: 500ms | |
''' | |
import os | |
import socket | |
import ctypes | |
import fcntl | |
from struct import pack, unpack | |
import sys | |
import mmap | |
import select | |
class Const(object): | |
ETH_P_ALL = 0x0003 | |
ETH_P_IP = 0x0800 | |
IFF_PROMISC = 0x100 | |
SIOCGIFFLAGS = 0x8913 | |
SIOCSIFFLAGS = 0x8914 | |
SO_ATTACH_FILTER = 26 | |
SOCK_NONBLOCK = 0x800 | |
SOL_PACKET = 263 | |
PACKET_RX_RING = 5 | |
PACKET_HOST = 0 # To us | |
PACKET_BROADCAST = 1 # To all | |
PACKET_MULTICAST = 2 # To group | |
PACKET_OTHERHOST = 3 # To someone else | |
PACKET_OUTGOING = 4 # Outgoing | |
PACKET_USER = 6 # To userspace | |
PACKET_KERNEL = 7 # To kernel | |
PAGESIZE = 4096 | |
TP_STATUS_KERNEL = 0 | |
TP_STATUS_USER = 1 | |
class tp_packet_req(ctypes.Structure): | |
_fields_ = [ | |
('tp_block_size', ctypes.c_uint), | |
('tp_block_nr', ctypes.c_uint), | |
('tp_frame_size', ctypes.c_uint), | |
('tp_frame_nr', ctypes.c_uint), | |
] | |
class tpacket_hdr(ctypes.Structure): | |
_fields_ = [ | |
('tp_status', ctypes.c_ulong), | |
('tp_len', ctypes.c_uint), | |
('tp_snaplen', ctypes.c_uint), | |
('tp_mac', ctypes.c_ushort), | |
('tp_net', ctypes.c_ushort), | |
('tp_sec', ctypes.c_uint), | |
('tp_usec', ctypes.c_uint), | |
] | |
def bpf_pack(x): | |
return pack('HBBI', x[0], x[1], x[2], x[3]) | |
NotPort22 = [ | |
( 0x28, 0, 0, 0x0000000c ), | |
( 0x15, 0, 8, 0x000086dd ), | |
( 0x30, 0, 0, 0x00000014 ), | |
( 0x15, 2, 0, 0x00000084 ), | |
( 0x15, 1, 0, 0x00000006 ), | |
( 0x15, 0, 17, 0x00000011 ), | |
( 0x28, 0, 0, 0x00000036 ), | |
( 0x15, 14, 0, 0x00000016 ), | |
( 0x28, 0, 0, 0x00000038 ), | |
( 0x15, 12, 13, 0x00000016 ), | |
( 0x15, 0, 12, 0x00000800 ), | |
( 0x30, 0, 0, 0x00000017 ), | |
( 0x15, 2, 0, 0x00000084 ), | |
( 0x15, 1, 0, 0x00000006 ), | |
( 0x15, 0, 8, 0x00000011 ), | |
( 0x28, 0, 0, 0x00000014 ), | |
( 0x45, 6, 0, 0x00001fff ), | |
( 0xb1, 0, 0, 0x0000000e ), | |
( 0x48, 0, 0, 0x0000000e ), | |
( 0x15, 2, 0, 0x00000016 ), | |
( 0x48, 0, 0, 0x00000010 ), | |
( 0x15, 0, 1, 0x00000016 ), | |
( 0x6, 0, 0, 0x00000000 ), | |
( 0x6, 0, 0, 0x00040000 ), | |
] | |
class Sniffer(object): | |
def __init__(self, nr_frames, filter=NotPort22): | |
# check number of frames is a power of 2 | |
assert(nr_frames & (nr_frames-1) == 0) | |
s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW|Const.SOCK_NONBLOCK, socket.htons(Const.ETH_P_ALL)) | |
assert(s is not None and s != -1) | |
# attach BPF filter | |
filter_content = b'' | |
for elm in filter: | |
filter_content += bpf_pack(elm) | |
addr_filter = ctypes.create_string_buffer(filter_content) | |
fprog = pack('HL', len(filter), ctypes.addressof(addr_filter)) | |
s.setsockopt(socket.SOL_SOCKET, Const.SO_ATTACH_FILTER, fprog) | |
# create packets ring buffer | |
tp = tp_packet_req() | |
tp.tp_block_size = nr_frames * Const.PAGESIZE | |
tp.tp_block_nr = 1 | |
tp.tp_frame_size = Const.PAGESIZE | |
tp.tp_frame_nr = nr_frames | |
self.nr_frames = nr_frames | |
s.setsockopt(Const.SOL_PACKET, Const.PACKET_RX_RING, tp) | |
self.sock = s | |
# map packets ring buffer | |
self.ringbuffer = mmap.mmap(s.fileno(), tp.tp_frame_size*tp.tp_frame_nr, | |
mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE) | |
self.offset = 0 | |
def recv_packets(self): | |
while True: | |
hdr = tpacket_hdr.from_buffer(self.ringbuffer, self.offset*Const.PAGESIZE) | |
if (hdr.tp_status & Const.TP_STATUS_USER) == 0: | |
break | |
pkt_offset = self.offset*Const.PAGESIZE + hdr.tp_mac | |
pkt_length = hdr.tp_snaplen | |
yield ((hdr.tp_sec, hdr.tp_usec), self.ringbuffer[pkt_offset:pkt_offset+pkt_length]) | |
hdr.tp_status = Const.TP_STATUS_KERNEL | |
self.offset += 1 | |
# should be a modulo, but we required to have a power of two | |
# in this case, &= (self.nr_frames - 1) is equivalent to %= self.nr_frames | |
self.offset &= (self.nr_frames - 1) | |
n_packets = 0 | |
with open(sys.argv[1], 'wb') as f: | |
# libpcap file format, tcpdump 2.4 | |
f.write(pack('!IHHIIII', 0xa1b2c3d4, 2, 4, 0, 0, 65536, 1)) | |
s = Sniffer(nr_frames=4096) | |
poller = select.poll() | |
poller.register(s.sock, select.POLLIN) | |
while True: | |
events = poller.poll(500) | |
for (fd, evt) in events: | |
assert(fd == s.sock.fileno()) | |
assert(evt == select.POLLIN) | |
for (ts, pkt) in s.recv_packets(): | |
(tv_sec, tv_usec) = ts | |
f.write(pack('!IIII', tv_sec, tv_usec, len(pkt), len(pkt))) | |
f.write(pkt) | |
n_packets += 1 | |
f.flush() | |
sys.stdout.write('\r captured %06d packets' % n_packets) | |
sys.stdout.flush() |
A rough idea, not tested.
BPF was originally designed to express packet filtering decisions. It has been extended into eBPF, and now further evolved into XDP, for eXpress DataPath.
An XDP program will run directly in the kernel, and would qualify to filter and possibly emit frames. I believe you can write both emit and drop logic in an XDP program. You will have to bind it to a specific NIC to make it run.
Thanks for replying ... I will try...
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
is there a way to bypass kernel in python?
I know it is not related to this... but,just asking...
Ex:If raw dns query packet is sent using PF_PACKET sockets(really,ethernet frame),after receiving the dns response,kernel will send port unreachable icmp packet to dns server...iptables can prevent that...but,is there an any other solution for that....?