Skip to content

Instantly share code, notes, and snippets.

@bitonic
Last active January 22, 2024 10:35
Show Gist options
  • Save bitonic/d3281b2d0fd95b4fd788aa7e013d1fb9 to your computer and use it in GitHub Desktop.
Save bitonic/d3281b2d0fd95b4fd788aa7e013d1fb9 to your computer and use it in GitHub Desktop.
Stopping linux threads example
// See <https://mazzo.li/posts/stopping-linux-threads.html>
// for blog post.
//
// Spawns a thread with a server listening on 55555 UDP, and
// then terminates it after 1 minute.
//
// I compile and run with
//
// clang++ -Wall -std=c++20 server.cpp -lpthread -o server && ./server
//
// Then you can
//
// echo "hello" | nc -u 127.0.0.1 55555
//
// to test.
//
// This version uses glibc's fairly new rseq support, but you should
// be able to modify it to setup rseq yourself, and then it'll work
// with any glibc. I'm running it with glibc 2.38 and linux 6.1.69.
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#include <sys/rseq.h>
#include <atomic>
#include <vector>
static void die_error(const char* what, int err) {
fprintf(stderr, "%s: %s\n", what, strerror(err));
exit(1);
}
static void die_syscall(const char* what) {
die_error(what, errno);
}
// we use it in the inline assembly below, didn't
// want to bother with macros to splice it in
static_assert(RSEQ_SIG == 0x53053053);
// Returns -1 and sets errno to EINTR if `*stop` was true
// before starting the syscall.
long syscall_or_stop(bool* stop, long n, long a, long b, long c, long d, long e, long f) {
long ret;
register long rd __asm__("r10") = d;
register long re __asm__("r8") = e;
register long rf __asm__("r9") = f;
__asm__ __volatile__ (
R"(
# struct rseq_cs {
# __u32 version;
# __u32 flags;
# __u64 start_ip;
# __u64 post_commit_offset;
# __u64 abort_ip;
# } __attribute__((aligned(32)));
.pushsection __rseq_cs, "aw"
.balign 32
1:
.long 0, 0 # version, flags
.quad 3f, (4f-3f), 2f # start_ip, post_commit_offset, abort_ip
.popsection
.pushsection __rseq_failure, "ax"
# sneak in the signature before abort section as
# `ud1 <sig>(%%rip), %%edi`, so that objdump will print it
.byte 0x0f, 0xb9, 0x3d
.long 0x53053053
2:
# exit with EINTR
jmp 5f
.popsection
# we set rseq->rseq_cs to our structure above.
# rseq = thread pointer (that is fs) + __rseq_offset
# rseq_cs is at offset 8
leaq 1b(%%rip), %%r12
movq %%r12, %%fs:8(%[rseq_offset])
3:
# critical section start -- check if we should stop
# and if yes skip the syscall
testb $255, %[stop]
jnz 5f
syscall
# it's important that syscall is the very last thing we do before
# exiting the critical section to respect the rseq contract of
# "no syscalls".
4:
jmp 6f
5:
movq $-4, %%rax # EINTR
6:
)"
: "=a" (ret) // the output goes in rax
: [stop] "m" (*stop),
[rseq_offset] "r" (__rseq_offset),
"a"(n), "D"(a), "S"(b), "d"(c), "r"(rd), "r"(re), "r"(rf)
: "cc", "memory", "rcx", "r11", "r12"
);
if (ret < 0 && ret > -4096) {
errno = -ret;
ret = -1;
}
return ret;
}
static long recvfrom_or_stop(bool* stop, int socket, void* buffer, size_t length) {
return syscall_or_stop(stop, __NR_recvfrom, socket, (long)buffer, length, 0, 0, 0);
}
// thread_local isn't really necessary here with one thread,
// but it would be necessary if we had many threads we wanted
// to kill separatedly.
static thread_local std::atomic<bool> stop = false;
static void stop_thread_handler(int signum) {
stop.store(true);
}
static void* server(void*) {
// create socket
int sock = socket(AF_INET, SOCK_DGRAM, 0);
if (sock < 0) {
die_syscall("socket");
}
// bind to 55555
struct sockaddr_in servaddr = { 0 };
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = INADDR_ANY;
servaddr.sin_port = htons(55555);
if (bind(sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) {
die_syscall("bind");
}
printf("listening on port 55555...\n");
// process
std::vector<char> buffer(9000); // jumbo frames anyone?
for (;;) {
printf("waiting for packet...\n");
ssize_t recvlen = recvfrom_or_stop((bool*)&stop, sock, buffer.data(), buffer.size());
if (recvlen < 0 && errno == EINTR) {
if (stop.load()) {
break;
} else {
continue; // critical section was aborted
}
}
if (recvlen < 0) {
die_syscall("recvfrom");
}
printf("got packet.\n");
size_t written = 0;
while (written < recvlen) {
ssize_t r = write(STDOUT_FILENO, &buffer[written], recvlen-written);
if (r < 0) {
die_syscall("write");
}
written += r;
}
}
// close socket before exiting
close(sock);
return nullptr;
}
int main() {
// install signal handler
{
struct sigaction act = {{ 0 }};
act.sa_handler = &stop_thread_handler;
if (sigaction(SIGUSR1, &act, nullptr) < 0) {
die_syscall("sigaction");
}
}
printf("starting server in different thread\n");
pthread_t thr;
{
int res = pthread_create(&thr, nullptr, &server, nullptr);
if (res != 0) {
die_error("pthread_create", res);
}
}
sleep(60);
printf("stopping server\n");
{
int res = pthread_kill(thr, SIGUSR1);
if (res != 0) {
die_error("pthread_kill", res);
}
res = pthread_join(thr, nullptr);
if (res != 0) {
die_error("pthread_join", res);
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment