Skip to content

Instantly share code, notes, and snippets.

@rigtorp
Created Dec 8, 2021
Embed
What would you like to do?
Simple tool to record UDP streams
// © 2021 Erik Rigtorp <erik@rigtorp.se>
// SPDX-License-Identifier: MIT
// udpcap: Simple tool to record UDP streams
#include <arpa/inet.h>
#include <err.h>
#include <errno.h>
#include <fcntl.h>
#include <getopt.h>
#include <limits.h>
#include <malloc.h>
#include <net/if.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/udp.h>
#include <pcap/pcap.h>
#include <poll.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <zstd.h>
static size_t packets = 0;
static bool rotate_now = false;
static bool active = true;
static void signal_handler(int signal) {
switch (signal) {
case SIGUSR1:
printf("number of packets received: %lu\n", packets);
break;
case SIGUSR2:
rotate_now = true;
break;
default:
active = false;
break;
}
}
struct zstd_cookie {
char *buf;
size_t bufSize;
ZSTD_CCtx *cctx;
FILE *file;
};
static ssize_t zstd_cookie_write(void *cookie_, const char *buf, size_t size) {
struct zstd_cookie *cookie = (struct zstd_cookie *)cookie_;
ZSTD_inBuffer input = {buf, size, 0};
for (;;) {
ZSTD_outBuffer output = {cookie->buf, cookie->bufSize, 0};
size_t const remaining =
ZSTD_compressStream2(cookie->cctx, &output, &input, ZSTD_e_continue);
if (ZSTD_isError(remaining) != 0) {
warnx("ZSTD_compressStream2: %s", ZSTD_getErrorName(remaining));
return 0;
}
size_t const writtenSize = fwrite(output.dst, 1, output.pos, cookie->file);
if (writtenSize != output.pos) {
warn("fwrite");
return 0;
}
if (remaining == 0) {
break;
}
}
return size;
}
static int zstd_cookie_close(void *cookie_) {
struct zstd_cookie *cookie = (struct zstd_cookie *)cookie_;
ZSTD_inBuffer input = {NULL, 0, 0};
int rc = 0;
for (;;) {
ZSTD_outBuffer output = {cookie->buf, cookie->bufSize, 0};
size_t const remaining =
ZSTD_compressStream2(cookie->cctx, &output, &input, ZSTD_e_end);
if (ZSTD_isError(remaining) != 0) {
warnx("ZSTD_compressStream2: %s", ZSTD_getErrorName(remaining));
rc = EOF;
break;
}
size_t const writtenSize = fwrite(output.dst, 1, output.pos, cookie->file);
if (writtenSize != output.pos) {
warn("fwrite");
rc = EOF;
break;
}
if (remaining == 0) {
break;
}
}
ZSTD_freeCCtx(cookie->cctx);
free(cookie->buf);
if (fclose(cookie->file) != 0) {
warn("fclose");
rc = EOF;
}
free(cookie);
return rc;
}
static const cookie_io_functions_t zstd_io_funcs = {
.read = NULL,
.write = zstd_cookie_write,
.seek = NULL,
.close = zstd_cookie_close,
};
static FILE *zstd_cookie_create(FILE *in, const char *mode) {
struct zstd_cookie *cookie = NULL;
cookie = (struct zstd_cookie *)malloc(sizeof(struct zstd_cookie));
if (cookie == NULL) {
goto err;
}
memset(cookie, 0, sizeof(struct zstd_cookie));
cookie->file = in;
cookie->bufSize = ZSTD_DStreamOutSize();
cookie->buf = (char *)malloc(cookie->bufSize);
if (cookie->buf == NULL) {
goto err;
}
cookie->cctx = ZSTD_createCCtx();
if (cookie->cctx == NULL) {
goto err;
}
ZSTD_CCtx_setParameter(cookie->cctx, ZSTD_c_checksumFlag, 1);
return fopencookie(cookie, mode, zstd_io_funcs);
err:
if (cookie != nullptr) {
free(cookie->buf);
ZSTD_freeCCtx(cookie->cctx);
free(cookie);
}
return NULL;
}
// Create zstd compressed FILE*
// Fails if file already exists
static FILE *fopenx(const char *pathname) {
int fd = open(pathname, O_WRONLY | O_APPEND | O_CREAT | O_EXCL, 0644);
if (fd == -1) {
warn("open: %s", pathname);
return nullptr;
}
auto *file = fdopen(fd, "a");
if (file == NULL) {
return nullptr;
}
return zstd_cookie_create(file, "a");
}
// Equivalent of mkdir -p $(dirname $path)
// Adapted from OpenBSD's mkdir (bin/mkdir/mkdir.c)
static int mkpath(char *path, mode_t mode) {
char *slash = path;
for (;;) {
slash += strspn(slash, "/");
slash += strcspn(slash, "/");
if (*slash == '\0') {
// Don't create directory for filename part
break;
}
*slash = '\0';
if (mkdir(path, mode) == -1) {
int mkdir_errno = errno;
struct stat sb;
if (stat(path, &sb) == -1) {
// Not there; use mkdir()s errno
errno = mkdir_errno;
return -1;
}
if (!S_ISDIR(sb.st_mode)) {
// Is there, but isn't a directory
errno = ENOTDIR;
return -1;
}
}
*slash = '/';
}
return 0;
}
// Open zstd compressed dump with timestamp in file name
static pcap_dumper_t *pcap_dump_openx(pcap_t *p, const char *fname,
const time_t *timep) {
tm tm;
if (gmtime_r(timep, &tm) == nullptr) {
warn("gmtime_r");
return nullptr;
}
char buf[PATH_MAX] = {};
if (strftime(buf, sizeof(buf), fname, &tm) == 0) {
warnx("strftime: %s", fname);
return nullptr;
}
if (mkpath(buf, 0755) == -1) {
warn("mkpath: %s", buf);
return nullptr;
}
auto *file = fopenx(buf);
if (file == nullptr) {
warn("fopenx: %s", buf);
return nullptr;
}
auto *dumper = pcap_dump_fopen(p, file);
if (dumper == nullptr) {
warn("pcap_dump_fopen");
if (fclose(file) != 0) {
warn("fclose");
}
return nullptr;
}
return dumper;
}
// Try and catch some pcap_dump_close errors
static int pcap_dump_closex(pcap_dumper_t *p) {
const int prev_errno = errno;
errno = 0;
pcap_dump_close(p);
if (errno != 0) {
return -1;
}
errno = prev_errno;
return 0;
}
// Try and catch some pcap_dump errors
static int pcap_dumpx(pcap_dumper_t *p, struct pcap_pkthdr *h, u_char *sp) {
pcap_dump((u_char *)p, h, sp);
if (ferror(pcap_dump_file(p)) != 0) {
return -1;
}
return 0;
}
int main(int argc, char *argv[]) {
const char *iface = nullptr;
int ifindex = 0;
const char *file = nullptr;
char file_buf[PATH_MAX];
time_t rotate_seconds = 15 * 60;
int buf_size = 0;
int port = -1;
const char *group = nullptr;
int opt;
while ((opt = getopt(argc, argv, "i:w:G:B:")) != -1) {
switch (opt) {
case 'i':
iface = optarg;
ifindex = if_nametoindex(optarg);
if (ifindex == 0) {
err(1, "if_nametoindex");
}
break;
case 'w':
file = optarg;
break;
case 'G':
rotate_seconds = atoi(optarg);
break;
case 'B':
buf_size = atoi(optarg);
break;
default:
goto usage;
}
}
if (optind >= argc) {
goto usage;
}
group = argv[optind++];
if (optind >= argc) {
goto usage;
}
port = atoi(argv[optind++]);
if (optind != argc) {
usage:
fprintf(
stderr,
"usage: udpcap [-i iface] [-w file] [-G rotate_seconds] [-B buffer_size] addr port\n"
"\n"
" -i iface\n"
" interface to listen on\n\n"
" -w file\n"
" filename to write to; default is:\n"
" %%Y/%%m/%%d/%%Y%%m%%dT%%H%%M%%SZ_$iface_$addr_$port.pcap.zst\n"
" formatted using strftime(3)\n\n"
" -G rotate_seconds\n"
" number of seconds before rotating the capture file\n"
" default is 900 seconds (15 minutes)\n\n"
" -B buffer_size\n"
" set the socket receive buffer to buffer_size, in units of "
"bytes\n\n"
" to print statistics to stdout send signal SIGUSR1\n"
" to immediately rotate the save file send signal SIGUSR2\n");
return 1;
}
if (file == nullptr) {
if (iface != nullptr) {
snprintf(file_buf, sizeof(file_buf), "%s_%s_%s_%i.pcap.zst",
"%Y/%m/%d/%Y%m%dT%H%M%SZ", iface, group, port);
} else {
snprintf(file_buf, sizeof(file_buf), "%s_%s_%i.pcap.zst",
"%Y/%m/%d/%Y%m%dT%H%M%SZ", group, port);
}
file = file_buf;
}
sigset_t mask, oldmask;
struct sigaction act = {};
act.sa_handler = signal_handler;
act.sa_flags = SA_RESTART;
int signals[] = {SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2};
for (size_t i = 0; i < sizeof(signals) / sizeof(signals[0]); ++i) {
if (sigaction(signals[i], &act, nullptr) == -1) {
err(1, "sigaction");
}
if (sigaddset(&mask, signals[i]) == -1) {
err(1, "sigaddset");
}
}
if (sigprocmask(SIG_BLOCK, &mask, &oldmask) == -1) {
err(1, "sigprocmask");
}
const int fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd == -1) {
err(1, "socket");
}
if (buf_size != 0) {
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size)) ==
-1) {
err(1, "setsockopt(SO_RCVBUF)");
}
}
const int yes = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1) {
err(1, "setsockopt(SO_REUSEADDR)");
}
if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &yes, sizeof(yes)) == -1) {
err(1, "setsockopt(SO_TIMESTAMP)");
}
sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(group);
if (bind(fd, (sockaddr *)&addr, sizeof(addr)) == -1) {
err(1, "bind");
}
if (iface != nullptr) {
if (setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, iface, strlen(iface)) ==
-1) {
err(1, "setsockopt(SO_BINDTODEVICE)");
}
}
// If addr is a multicast address then add membership
if ((ntohl(addr.sin_addr.s_addr) & 0xF0000000) == 0xE0000000) {
const int no = 0;
if (setsockopt(fd, SOL_SOCKET, IP_MULTICAST_ALL, &no, sizeof(no)) == -1) {
warn("setsockopt(IP_MULTICAST_ALL)");
}
ip_mreqn req = {};
req.imr_multiaddr = addr.sin_addr;
req.imr_ifindex = ifindex;
if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &req, sizeof(req)) ==
-1) {
err(1, "setsockopt(IP_ADD_MEMBERSHIP)");
}
}
pcap_t *pcap = pcap_open_dead(DLT_IPV4, 1500);
if (pcap == nullptr) {
err(1, "pcap_open_dead");
}
pcap_dumper_t *dumper = nullptr;
time_t dump_start_time = {};
struct {
struct ip iphdr;
struct udphdr udphdr;
char data[1500];
} pkt = {};
pkt.iphdr.ip_v = 4;
pkt.iphdr.ip_p = IPPROTO_UDP;
pkt.iphdr.ip_hl = 5;
pkt.iphdr.ip_ttl = 2;
pkt.iphdr.ip_dst = addr.sin_addr;
pkt.udphdr.dest = htons(port);
pollfd pfd = {
.fd = fd,
.events = POLLIN,
.revents = 0,
};
for (;;) {
if (ppoll(&pfd, 1, nullptr, &oldmask) == -1) {
if (errno != EINTR) {
err(1, "ppoll");
}
if (!active) {
break;
}
if (rotate_now) {
puts("rotating capture file at user's request");
if (pcap_dump_closex(dumper) == -1) {
err(1, "pcap_dump_close");
}
dumper = nullptr;
rotate_now = false;
}
}
for (;;) {
iovec iov = {};
iov.iov_base = &pkt.data;
iov.iov_len = sizeof(pkt.data);
msghdr msg = {};
sockaddr_in saddr = {};
union {
char buf[CMSG_SPACE(sizeof(timeval))];
struct cmsghdr align;
} control;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_name = &saddr;
msg.msg_namelen = sizeof(saddr);
msg.msg_control = &control;
msg.msg_controllen = sizeof(control);
int n = recvmsg(fd, &msg, MSG_DONTWAIT);
if (n == -1) {
if (errno != EAGAIN || errno != EWOULDBLOCK) {
err(1, "recvmsg");
}
break;
}
if ((msg.msg_flags & MSG_TRUNC) != 0) {
warnx("truncated packet");
}
if ((msg.msg_flags & MSG_CTRUNC) != 0) {
warnx("truncated control message");
}
pcap_pkthdr header = {};
for (auto *cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_SOCKET &&
cmsg->cmsg_type == SCM_TIMESTAMP &&
cmsg->cmsg_len == CMSG_LEN(sizeof(header.ts))) {
memcpy(&header.ts, CMSG_DATA(cmsg), sizeof(header.ts));
}
}
packets++;
header.caplen = sizeof(pkt.iphdr) + sizeof(pkt.udphdr) + n;
header.len = header.caplen;
pkt.iphdr.ip_src = saddr.sin_addr;
pkt.iphdr.ip_len = htons(header.len);
pkt.udphdr.len = htons(sizeof(pkt.udphdr) + n);
pkt.udphdr.source = saddr.sin_port;
if (dumper != nullptr &&
header.ts.tv_sec - dump_start_time > rotate_seconds) {
if (pcap_dump_closex(dumper) == -1) {
err(1, "pcap_dump_close");
}
dumper = nullptr;
}
if (dumper == nullptr) {
dumper = pcap_dump_openx(pcap, file, &header.ts.tv_sec);
if (dumper == nullptr) {
pcap_close(pcap);
err(1, "pcap_dump_openx");
}
dump_start_time = header.ts.tv_sec;
}
if (pcap_dumpx(dumper, &header, (u_char *)&pkt) == -1) {
pcap_close(pcap);
err(1, "pcap_dump");
}
}
}
if (dumper != nullptr) {
if (pcap_dump_closex(dumper) == -1) {
err(1, "pcap_dump_close");
}
}
pcap_close(pcap);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment