Skip to content

Instantly share code, notes, and snippets.

@gatopeich
Last active April 1, 2020 08:38
Show Gist options
  • Save gatopeich/fc302a0662d574c218302334f7b0cb33 to your computer and use it in GitHub Desktop.
Save gatopeich/fc302a0662d574c218302334f7b0cb33 to your computer and use it in GitHub Desktop.
Using TPACKET_V3 to measure interface capacity. Proof of concept by gatopeich
// Using TPACKET_V3 to measure interface capacity. Proof of concept by gatopeich
// Based on the folloging information sources:
// [1] Kernel documentation: https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt
// [2] Kernel "selftest" tools: https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/psock_tpacket.c
// [3] Answer to "Sending data with PACKET_MMAP..." in https://stackoverflow.com/a/43427533/501336
// [4] packet.7 man page http://man7.org/linux/man-pages/man7/packet.7.html
// [5] Kernel sources https://github.com/torvalds/linux/blob/master/net/packet/af_packet.c
#include <linux/if_packet.h>
#include <linux/if_ether.h>
#include <linux/net_tstamp.h>
#include <net/if.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/vtimes.h>
#include <poll.h>
#include <string.h>
#include <unistd.h>
#include <chrono>
#include <iostream>
#include <string>
#include <stdexcept>
using namespace std;
static const unsigned FRAME_SIZE = 2048;
static const unsigned BLOCK_SIZE = 128 * FRAME_SIZE;
static const unsigned DONT_WAIT = MSG_DONTWAIT;
static const unsigned IGNORE_WRONG_FORMAT = 0;
static const unsigned QDISC_BYPASS = 1;
static const unsigned RX_FEATURES = 0; //TP_FT_REQ_FILL_RXHASH;
static uint32_t SOUR_ADDRESS = 0x01020304;
static uint32_t DEST_ADDRESS = 0x04030201;
struct os_error : public runtime_error
{
os_error(const string& what) : runtime_error(what + ". " + string(strerror(errno))) {}
};
// TPACKET_V3 TX+RX entity
class TpacketHandler
{
public:
TpacketHandler(int tx_len, const string& device, long buf_mb);
void poll(int timeout_ms);
void display(tpacket3_hdr *hdr);
void debug();
public:
const bool TX, RX;
const int tx_len;
const int n_blocks;
int sock;
uint8_t* ring = nullptr;
int block_index = 0;
int packet_index = 0;
int packets_in_block = 0;
tpacket_block_desc* block_desc = nullptr;
tpacket3_hdr* packet_hdr = nullptr;
struct Stats {
long packets, drops, bytes;
void print();
} stats {};
};
int get_raw_socket(const string& device, int version, unsigned proto)
{
int sock(::socket(PF_PACKET, SOCK_RAW, proto));
if (sock<0)
throw os_error("Can't get raw socket");
if (::setsockopt(sock, SOL_PACKET, PACKET_VERSION, &version, sizeof(version)) != 0)
throw os_error("Can't set socket version");
ifreq s_ifr{};
strncpy (s_ifr.ifr_name, device.c_str(), sizeof(s_ifr.ifr_name));
if (::ioctl(sock, SIOCGIFINDEX, &s_ifr) != 0)
throw os_error("Error resolving interface " + device);
sockaddr_ll my_addr{};
my_addr.sll_family = AF_PACKET;
my_addr.sll_protocol = proto;
my_addr.sll_ifindex = s_ifr.ifr_ifindex;
if (::bind(sock, (sockaddr*)&my_addr, sizeof(sockaddr_ll)) != 0)
throw os_error("Error binding socket to interface " + device);
return sock;
}
TpacketHandler::TpacketHandler(int tx_len, const string& device, long buf_mb)
: TX(!!tx_len), RX(!TX), tx_len(tx_len), n_blocks((buf_mb<<20)/BLOCK_SIZE), packets_in_block(BLOCK_SIZE/FRAME_SIZE)
{
sock = get_raw_socket(device, TPACKET_V3, TX ? 0 : htons(ETH_P_ALL)); // proto 0 is faster for TX?
if (sock<0)
throw os_error("Can't open socket");
if (TX) { // From packet.7 manpage:
if (::setsockopt(sock, SOL_PACKET, PACKET_LOSS, &IGNORE_WRONG_FORMAT, sizeof(IGNORE_WRONG_FORMAT)))
throw os_error("PACKET_LOSS");
if (::setsockopt(sock, SOL_PACKET, PACKET_QDISC_BYPASS, &QDISC_BYPASS, sizeof(QDISC_BYPASS)))
throw os_error("PACKET_QDISC_BYPASS");
unsigned wmem = n_blocks * BLOCK_SIZE * 2;
if (::setsockopt(sock, SOL_SOCKET, SO_SNDBUFFORCE, &wmem, sizeof(wmem)))
throw os_error("SO_SNDBUFFORCE");
}
tpacket_req3 req {0};
req.tp_block_size = BLOCK_SIZE;
req.tp_frame_size = FRAME_SIZE;
req.tp_block_nr = n_blocks;
req.tp_frame_nr = n_blocks * packets_in_block;
if (RX) {
req.tp_retire_blk_tov = 64; // Timeout in millis
req.tp_feature_req_word = RX_FEATURES;
}
if (::setsockopt(sock, SOL_PACKET, TX?PACKET_TX_RING:PACKET_RX_RING, &req, sizeof(req)))
throw os_error("setsockopt for ring failed");
unsigned size = n_blocks * BLOCK_SIZE;
void* map = mmap(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_LOCKED|MAP_POPULATE, sock, 0);
if (map == MAP_FAILED)
throw os_error("mmap failed. Try ulimit -l <mem kb> or check system settings.");
ring = (uint8_t*) map;
block_desc = (tpacket_block_desc*)ring;
packet_hdr = (tpacket3_hdr*)(ring + RX*block_desc->hdr.bh1.offset_to_first_pkt);
// debug();
}
void TpacketHandler::debug()
{
for (int b=0; b<n_blocks; ++b) {
auto block = (tpacket_block_desc*)(ring + b*BLOCK_SIZE);
auto hdr = (tpacket3_hdr*)((uint8_t*)block + RX*block->hdr.bh1.offset_to_first_pkt);
int packets = RX ? block->hdr.bh1.num_pkts : packets_in_block;
if ((TX)) cerr << "TX Block #" << b << " (" << packets << " packets): ";
else cerr << "RX Block #" << b << " (" << packets << " packets, status=" << block->hdr.bh1.block_status << "): ";
unsigned prev = -1, more = 0;
for (int p=0; p<packets; ++p) {
if (hdr->tp_status != prev) {
if (more) {
cerr << "x" << more+1;
more = 0;
}
if (p>0) cerr << ",";
prev = hdr->tp_status;
cerr << prev;
} else ++more;
hdr = (tpacket3_hdr*)((uint8_t*)hdr + (TX?FRAME_SIZE:hdr->tp_next_offset));
}
if (more)
cerr << "x" << more+1;
cerr << endl;
}
}
static void create_payload(uint8_t* pay, int len)
{
ethhdr* eth = (ethhdr*)pay;
memset(pay, 0xff, ETH_ALEN * 2);
eth->h_proto = htons(ETH_P_IP);
iphdr* ip = (iphdr*)(eth+1);
ip->ihl = 5;
ip->version = 4;
ip->protocol = 0x11;
ip->frag_off = 0;
ip->ttl = 64;
ip->tot_len = htons((uint16_t) len - sizeof(*eth));
ip->saddr = htonl(SOUR_ADDRESS);
ip->daddr = htonl(DEST_ADDRESS);
// memset(pay + sizeof(*eth) + sizeof(*ip), DATA_CHAR, len-42);
}
void TpacketHandler::poll(int timeout_ms)
{
if (TX) {
if (packet_hdr->tp_status != TP_STATUS_AVAILABLE) {
if (packet_hdr->tp_status == TP_STATUS_WRONG_FORMAT)
throw os_error("TP_STATUS_WRONG_FORMAT");
// Block on send() cause we have nothing else to do
if(::send(sock, NULL, 0, 0) == -1)
throw os_error("sendto");
return;
}
create_payload((uint8_t*)packet_hdr + TPACKET_ALIGN(sizeof(tpacket3_hdr)), tx_len);
// create_payload((uint8_t*)packet_hdr + packet_hdr->tp_mac, tx_len);
packet_hdr->tp_snaplen = packet_hdr->tp_len = tx_len;
packet_hdr->tp_next_offset = 0;
packet_hdr->tp_status = TP_STATUS_SEND_REQUEST;
}
else // RX:
{
if (packet_index == 0) // Start of block
{
if (!(block_desc->hdr.bh1.block_status & TP_STATUS_USER))
{
// Wait for some frames until at least the first frame in the block is filled up
pollfd pfd {sock, POLLIN|POLLRDNORM|POLLERR|POLLRDHUP, 0};
if (::poll(&pfd, 1, timeout_ms) == -1)
throw os_error("Error polling: ");
if (!(block_desc->hdr.bh1.block_status & TP_STATUS_USER))
return; // Will poll again
}
packet_hdr = (tpacket3_hdr*)((uint8_t*)block_desc + block_desc->hdr.bh1.offset_to_first_pkt);
packets_in_block = block_desc->hdr.bh1.num_pkts;
if (packets_in_block == 0)
{
cerr << "Warning: packets_in_block == 0 (kernel bug?)\n";
block_desc->hdr.bh1.block_status = TP_STATUS_KERNEL;
block_index = (block_index+1) % n_blocks;
block_desc = (tpacket_block_desc*) (ring + block_index*BLOCK_SIZE);
return;
}
}
// Display one in many
if (!stats.packets&0xfffff) {
cout << "Packet # " << stats.packets << ": ";
display(packet_hdr);
cout << endl;
}
}
++stats.packets;
stats.bytes += packet_hdr->tp_len;
if (++packet_index < packets_in_block) {
packet_hdr = (tpacket3_hdr*)((uint8_t*)packet_hdr + (TX?FRAME_SIZE:packet_hdr->tp_next_offset));
// __sync_synchronize();
} else {
if (TX) {
if(::send(sock, NULL, 0, DONT_WAIT) == -1)
throw os_error("send");
} else {
block_desc->hdr.bh1.block_status = TP_STATUS_KERNEL;
__sync_synchronize(); // Do we need this?
}
block_index = (block_index+1) % n_blocks;
block_desc = (tpacket_block_desc*)(ring + block_index*BLOCK_SIZE);
packet_hdr = (tpacket3_hdr*) block_desc;
packet_index = 0;
}
}
void TpacketHandler::display(tpacket3_hdr* hdr)
{
auto eth = (ethhdr*) ((uint8_t*)hdr + hdr->tp_mac); // Note: tp_mac is only set on RX
auto ip = (iphdr*) ((uint8_t*) eth + ETH_HLEN);
if (eth->h_proto == htons(ETH_P_IP)) {
sockaddr_in ss, sd;
char sbuff[NI_MAXHOST], dbuff[NI_MAXHOST];
memset(&ss, 0, sizeof(ss));
ss.sin_family = PF_INET;
ss.sin_addr.s_addr = ip->saddr;
getnameinfo((sockaddr *) &ss, sizeof(ss),
sbuff, sizeof(sbuff), NULL, 0, NI_NUMERICHOST);
memset(&sd, 0, sizeof(sd));
sd.sin_family = PF_INET;
sd.sin_addr.s_addr = ip->daddr;
getnameinfo((sockaddr *) &sd, sizeof(sd),
dbuff, sizeof(dbuff), NULL, 0, NI_NUMERICHOST);
printf("%s -> %s", sbuff, dbuff);
}
if ((hdr->hv1.tp_rxhash))
printf(", rxhash: 0x%x", hdr->hv1.tp_rxhash);
}
void TpacketHandler::Stats::print()
{
static auto prev_time = chrono::steady_clock::now();
static TpacketHandler::Stats prev{0};
static struct vtimes prev_times{0};
auto now = chrono::steady_clock::now();
struct vtimes times;
vtimes(&times, nullptr);
auto elapsed = prev_times.vm_utime ? 1e-9*(now-prev_time).count() : 3.0;
cout << "Total packets: " << packets << ", bytes: " << bytes;
cout << ", PPS=" << (packets - prev.packets) / elapsed;
cout << ", Mbps=" << 8e-6*(bytes - prev.bytes) / elapsed;
auto vtime_scale = 100 / (elapsed * VTIMES_UNITS_PER_SECOND);
int cpu_u = (times.vm_utime - prev_times.vm_utime) * vtime_scale;
int cpu_s = (times.vm_stime - prev_times.vm_stime) * vtime_scale;
cout << ". CPU usage: " << cpu_u << "% user, " << cpu_s << "% kernel";
prev_times = times;
prev_time = now;
prev = *this;
cout << endl;
}
int main(int argc, char* argv[])
{
if (argc<3) {
cerr << "RawFlood TPACKET_V3 tester (c) 2019 gatopeich. Use at your own risk." << endl;
cerr << "Usage: " << argv[0] << " tx|rx <device> <packet len> <buffer size in MB> <dest ipv4> <src ipv4>" << endl;
return 1;
}
bool TX = !strcasecmp(argv[1],"TX");
string device = argv[2];
int tp_len = argc>3 ? atoi(argv[3]) : 1514;
int buf_mb = argc>4 ? atoi(argv[4]) : 4;
if (argc>5) {
char* addr = (char*) &DEST_ADDRESS;
sscanf(argv[5], "%hhu.%hhu.%hhu.%hhu", addr+3, addr+2,addr+1,addr+0);
}
if (argc>6) {
char* addr = (char*) &SOUR_ADDRESS;
sscanf(argv[6], "%hhu.%hhu.%hhu.%hhu", addr+3, addr+2,addr+1,addr+0);
}
cerr << "Testing TPACKET_V3 " << (TX?"TX":"RX") << " with packet len=" << tp_len << ", " << buf_mb << " MB ring"
", FRAME_SIZE=" << FRAME_SIZE << ", BLOCK_SIZE=" << BLOCK_SIZE <<
", IGNORE_WRONG_FORMAT=" << IGNORE_WRONG_FORMAT << ", QDISC_BYPASS=" << QDISC_BYPASS << ", DONT_WAIT=" << DONT_WAIT << endl;
auto handler = TpacketHandler(TX*tp_len, device, buf_mb);
auto last_t = chrono::steady_clock::now();
while (1) {
handler.poll(1000);
auto t = chrono::steady_clock::now();
if (t - last_t > chrono::seconds(3)) {
handler.stats.print();
// handler.debug();
last_t = t;
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment