Skip to content

Instantly share code, notes, and snippets.

@Desour
Last active December 18, 2022 16:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Desour/5351f1fe74a88f539ab909b6a7a254ed to your computer and use it in GitHub Desktop.
Save Desour/5351f1fe74a88f539ab909b6a7a254ed to your computer and use it in GitHub Desktop.
Use futexes and busy waiting for low-latency (avg ca. 130 ns) synchronous IPC calls
// copied and adapted from the example in man page futex(2) ("futex_demo.c")
// requires libfmt (because of personal dislike of c++ stream output)
// compile with:
// g++ -O2 -Wall -Wextra -std=c++17 -g -o "test_futex" "test_futex.cpp" -lfmt
// The busy waiting uses x86-64 intrinsics:
// * rdtsc: IIRC, the timestamp values are processor-specific. (No longer used,
// benchmark runs faster without it.)
// * pause: always available on x86-64
#include <cstdio>
#include <cerrno>
#include <stdatomic.h>
#include <atomic>
#include <cstdint>
#include <cstdlib>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/time.h>
#include <immintrin.h>
#include <fmt/core.h>
constexpr bool do_busy_waiting = true;
#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \
} while (0)
using atomic_uint32_t = std::atomic<uint32_t>;
static_assert(sizeof(atomic_uint32_t) == sizeof(uint32_t));
static_assert(atomic_uint32_t::is_always_lock_free);
struct SharedState {
atomic_uint32_t futex1;
atomic_uint32_t futex2;
int val;
int data[32];
};
static SharedState *shared_state = nullptr;
static void busy_wait(int64_t dt)
{
//~ int64_t t0 = _rdtsc();
//~ int64_t t;
//~ do {
//~ _mm_pause();
//~ t = _rdtsc();
//~ } while (t < t0 + dt);
for (int64_t i = 0; i < dt; ++i) {
_mm_pause();
}
}
static int
futex(atomic_uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, uint32_t *uaddr2, uint32_t val3)
{
return syscall(SYS_futex, uaddr, futex_op, val,
timeout, uaddr2, val3);
}
// futex values:
// * 0: receiving side is waiting with busy wait (or will check value before futexing) => needs no wake
// * 1: it's posted
// * 2: receiving side is waiting with futex syscall
static void
fwait(atomic_uint32_t *futexp)
{
while (true) {
if constexpr (do_busy_waiting) {
for (int i = 0; i < 100; ++i) {
/* Is the futex available? */
if (std::atomic_exchange(futexp, 0) == 1)
return; // yes
busy_wait(40);
}
// write 2 to show that we're futexing
if (std::atomic_exchange(futexp, 2) == 1) {
// futex was posted => change 2 to 0 (or 1 to 1)
std::atomic_fetch_and(futexp, 1);
return;
}
} else {
/* Is the futex available? */
if (std::atomic_exchange(futexp, 2) == 1)
return; // yes
}
/* Futex is not available; wait. */
long s = futex(futexp, FUTEX_WAIT, 2, nullptr, nullptr, 0);
if (s == -1 && errno != EAGAIN)
errExit("futex-FUTEX_WAIT");
}
}
static void
fpost(atomic_uint32_t *futexp)
{
uint32_t oldval = std::atomic_exchange(futexp, 1);
if (oldval == 2) {
long s = futex(futexp, FUTEX_WAKE, 1, nullptr, nullptr, 0);
if (s == -1)
errExit("futex-FUTEX_WAKE");
}
}
double timediff_seconds(struct timespec t1, struct timespec t0)
{
return t1.tv_sec - t0.tv_sec + 1.0e-9 * (t1.tv_nsec - t0.tv_nsec);
}
// recvs val and answers with 13
// if val==0, dies
void child()
{
int val;
do {
fwait(&shared_state->futex1);
val = shared_state->val;
shared_state->val = 13;
//~ shared_state->val = rand();
//~ shared_state->data[0] = rand();
//~ shared_state->data[1] = rand();
//~ shared_state->data[2] = rand();
//~ shared_state->data[3] = rand();
//~ fmt::print("[c] got val={}\n", val);
fpost(&shared_state->futex2);
} while (val != 0);
}
// sends num_calls vals, then waits for child to die
void parent(pid_t child_pid)
{
static constexpr int num_calls = do_busy_waiting ? 1000000 : 100000;
struct timespec t0;
struct timespec t1;
clock_gettime(CLOCK_MONOTONIC, &t0);
for (int val = num_calls-1; val >= 0; --val) {
fwait(&shared_state->futex2);
//~ fmt::print("[p] got val={}, sending val={}\n", shared_state->val, val);
shared_state->val = val;
fpost(&shared_state->futex1);
}
clock_gettime(CLOCK_MONOTONIC, &t1);
double dt = timediff_seconds(t1, t0);
fmt::print("[p] dt = {} s; per call: {} ns\n", dt, dt / num_calls * 1e9);
fmt::print("[p] waiting for child to die...\n");
int wstatus;
waitpid(child_pid, &wstatus, 0);
(void)wstatus;
}
int main()
{
setbuf(stdout, nullptr);
fmt::print("starting...\n");
/* Create a shared anonymous mapping that will hold the futexes.
Since the futexes are being shared between processes, we
subsequently use the "shared" futex operations (i.e., not the
ones suffixed "_PRIVATE"). */
void *mmap_mem = mmap(nullptr, sizeof(SharedState), PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_SHARED, -1, 0);
if (mmap_mem == MAP_FAILED)
errExit("mmap");
shared_state = new(mmap_mem) SharedState;
shared_state->futex1 = 0; /* State: unavailable */
shared_state->futex2 = 1; /* State: available */
shared_state->val = -1;
/* Create a child process that inherits the shared anonymous
mapping. */
pid_t pid = fork();
if (pid == -1) {
errExit("fork");
} else if (pid == 0) {
child();
fmt::print("[c] end\n");
} else {
parent(pid);
fmt::print("[p] end\n");
//~ munmap(mmapped_mem, 0x1000);
}
wait(nullptr);
exit(EXIT_SUCCESS);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment