Skip to content

Instantly share code, notes, and snippets.

@yunazuno
Last active November 2, 2023 09:21
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save yunazuno/14eb1b7fc5eb6bb23f36d1d01ed00367 to your computer and use it in GitHub Desktop.
Save yunazuno/14eb1b7fc5eb6bb23f36d1d01ed00367 to your computer and use it in GitHub Desktop.
Testing a XDP program with BPF_PROG_TEST_RUN through Python
#include <uapi/linux/if_ether.h>
#include <uapi/linux/in.h>
#include <uapi/linux/ip.h>
#include <uapi/linux/tcp.h>
int drop_ipv4_tcp_80(struct xdp_md *ctx) {
void *data = (void *)(long)ctx->data;
void *data_end = (void *)(long)ctx->data_end;
struct ethhdr *eth;
struct iphdr *iph;
struct tcphdr *th;
eth = data;
if ((void *)(eth + 1) > data_end)
return XDP_DROP;
if (eth->h_proto != htons(ETH_P_IP))
return XDP_PASS;
iph = (struct iphdr *)(eth + 1);
if ((void *)(iph + 1) > data_end)
return XDP_DROP;
if (iph->ihl != 5 || iph->frag_off & htons(0x2000 | 0x1FFF))
return XDP_PASS;
if (iph->protocol != IPPROTO_TCP) {
return XDP_PASS;
}
th = (struct tcphdr *)(iph + 1);
if ((void *)(th + 1) > data_end)
return XDP_DROP;
if (th->dest == htons(80))
return XDP_DROP;
return XDP_PASS;
}
import unittest
from bcc import BPF, libbcc
import ctypes
from scapy.all import *
class PacketDropTestCase(unittest.TestCase):
bpf = None
func = None
DATA_OUT_LEN = 1514
def _run_test(self, data, data_out_expect, retval_expect, repeat=1):
size = len(data)
data = ctypes.create_string_buffer(raw(data), size)
data_out = ctypes.create_string_buffer(self.DATA_OUT_LEN)
size_out = ctypes.c_uint32()
retval = ctypes.c_uint32()
duration = ctypes.c_uint32()
ret = libbcc.lib.bpf_prog_test_run(self.func.fd, repeat,
ctypes.byref(data), size,
ctypes.byref(data_out),
ctypes.byref(size_out),
ctypes.byref(retval),
ctypes.byref(duration))
self.assertEqual(ret, 0)
self.assertEqual(retval.value, retval_expect)
if data_out_expect:
self.assertEqual(data_out[:size_out.value], raw(data_out_expect))
def setUp(self):
self.bpf = BPF(src_file=b"drop_ipv4_tcp_80.c")
self.func = self.bpf.load_func(b"drop_ipv4_tcp_80", BPF.XDP)
def test_ipv4_tcp_80(self):
packet_in = Ether() / IP() / TCP(dport=80)
self._run_test(packet_in, None, BPF.XDP_DROP)
def test_ipv4_udp_80(self):
packet_in = Ether() / IP() / UDP(dport=80)
self._run_test(packet_in, packet_in, BPF.XDP_PASS)
def test_ipv4_tcp_443(self):
packet_in = Ether() / IP() / TCP(dport=443)
self._run_test(packet_in, packet_in, BPF.XDP_PASS)
def test_ipv6_tcp_80(self):
packet_in = Ether() / IPv6() / TCP(dport=443)
self._run_test(packet_in, packet_in, BPF.XDP_PASS)
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment