Skip to content

Instantly share code, notes, and snippets.

@anarazel
Last active July 12, 2023 19:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anarazel/fdafe6c968f7346a062d315f2759733e to your computer and use it in GitHub Desktop.
Save anarazel/fdafe6c968f7346a062d315f2759733e to your computer and use it in GitHub Desktop.
#define _GNU_SOURCE
#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <unistd.h>
#ifndef SECS
#define SECS 5
#endif
#ifndef NTHREADS
#define NTHREADS 40
#endif
#ifndef ATOMIC_WIDTH
#define ATOMIC_WIDTH 64
#endif
static pthread_barrier_t barrier;
#if ATOMIC_WIDTH==32
typedef uint32_t atomic_width_t;
#elif ATOMIC_WIDTH==64
typedef uint64_t atomic_width_t;
#elif ATOMIC_WIDTH==128
typedef unsigned __int128 atomic_width_t;
typedef union
{
unsigned __int128 value;
struct
{
uint64_t value_upper;
uint64_t value_lower;
};
} uint128_int;
#else
#error "width not handled"
#endif
struct shared_data {
char pad0[64];
atomic_width_t count;
#if defined(ATOMIC_METHOD_MUTEX)
pthread_mutex_t mutex;
#endif
char pad1[64];
} shared_data;
typedef struct per_thread_data
{
uint64_t count;
uint64_t retries;
} per_thread_data;
#if ATOMIC_WIDTH <= 64
static inline void
atomic_add(atomic_width_t *val, atomic_width_t add_)
{
__asm__ __volatile__(
" lock \n"
" add %1,%0 \n"
: "+m"(*val)
: "r" (add_)
: "memory", "cc");
}
static inline void
atomic_inc(atomic_width_t *val)
{
__asm__ __volatile__(
" lock \n"
#if ATOMIC_WIDTH == 32
" incl %0 \n"
#else
" incq %0 \n"
#endif
: "+m"(*val)
:
: "memory", "cc");
}
static inline bool
atomic_cmpxchg(volatile atomic_width_t *ptr,
atomic_width_t *expected, atomic_width_t newval)
{
char ret;
/*
* Perform cmpxchg and use the zero flag which it implicitly sets when
* equal to measure the success.
*/
__asm__ __volatile__(
" lock \n"
" cmpxchg %4,%5 \n"
" setz %2 \n"
: "=a" (*expected), "=m"(*ptr), "=q" (ret)
: "a" (*expected), "r" (newval), "m"(*ptr)
: "memory", "cc");
return (bool) ret;
}
static inline bool
non_atomic_cmpxchg(volatile atomic_width_t *ptr,
atomic_width_t *expected, atomic_width_t newval)
{
char ret;
/*
* Perform cmpxchg and use the zero flag which it implicitly sets when
* equal to measure the success.
*/
__asm__ __volatile__(
" cmpxchg %4,%5 \n"
" setz %2 \n"
: "=a" (*expected), "=m"(*ptr), "=q" (ret)
: "a" (*expected), "r" (newval), "m"(*ptr)
: "memory", "cc");
return (bool) ret;
}
#endif /* ATOMIC_WIDTH <= 64 */
#if ATOMIC_WIDTH == 128
static inline bool
atomic_cmpxchg16(volatile atomic_width_t *ptr,
atomic_width_t *expected, atomic_width_t newval)
{
bool result;
uint128_int *ptr_i = (uint128_int *) ptr;
uint128_int *expected_i = (uint128_int *) expected;
uint128_int newval_i = (uint128_int) newval;
__asm__ __volatile__("lock; cmpxchg16b %0"
: "+m" (ptr_i->value), "=@ccz" (result),
"+d" (expected_i->value_lower), "+a" (expected_i->value_upper)
: "c" (newval_i.value_lower), "b" (newval_i.value_upper)
: "memory");
return result;
}
#endif
static void
handle_sigint(int sig)
{
if (getpid() != gettid())
pthread_exit(0);
else
_exit(1);
}
void *
thread_main(void *p)
{
per_thread_data *this_thread = (per_thread_data *) p;
signal(SIGINT, handle_sigint);
pthread_barrier_wait(&barrier);
while (1)
{
#if defined(NON_ATOMIC_METHOD_RACY)
this_thread->count++;
shared_data.count++;
__asm__ __volatile__("" :::"memory");
#elif defined(ATOMIC_METHOD_MUTEX)
pthread_mutex_lock(&shared_data.mutex);
this_thread->count++;
shared_data.count++;
pthread_mutex_unlock(&shared_data.mutex);
#elif defined(ATOMIC_METHOD_ADD)
this_thread->count++;
atomic_add(&shared_data.count, 1);
#elif defined(ATOMIC_METHOD_INC)
this_thread->count++;
atomic_inc(&shared_data.count);
#elif defined(ATOMIC_METHOD_XADD)
this_thread->count++;
__atomic_fetch_add(&shared_data.count, 1, __ATOMIC_SEQ_CST);
#elif defined(ATOMIC_METHOD_CMPXCHG)
atomic_width_t cur;
this_thread->count++;
cur = shared_data.count;
while (!__atomic_compare_exchange_n(&shared_data.count, &cur, cur + 1,
0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
{
this_thread->retries++;
}
#elif defined(ATOMIC_METHOD_CMPXCHG_ASM)
atomic_width_t cur;
this_thread->count++;
cur = shared_data.count;
while (!atomic_cmpxchg(&shared_data.count, &cur, cur + 1))
{
this_thread->retries++;
}
#elif defined(ATOMIC_METHOD_CMPXCHG16_ASM)
atomic_width_t cur;
this_thread->count++;
cur = shared_data.count;
while (!atomic_cmpxchg16(&shared_data.count, &cur, cur + 1))
{
this_thread->retries++;
}
#elif defined(NON_ATOMIC_METHOD_CMPXCHG_ASM)
atomic_width_t cur;
this_thread->count++;
cur = shared_data.count;
while (!non_atomic_cmpxchg(&shared_data.count, &cur, cur + 1))
{
this_thread->retries++;
}
#else
#error gotta tell me what to do
#endif
}
}
int
main(int argc, char **argv)
{
pthread_t threads[NTHREADS];
per_thread_data *thread_data[NTHREADS];
int secs = SECS;
#ifdef VERBOSE
bool verbose = false;
#else
bool verbose = false;
#endif
signal(SIGINT, handle_sigint);
pthread_barrier_init(&barrier, NULL, NTHREADS+1);
#if defined(ATOMIC_METHOD_MUTEX)
pthread_mutex_init(&shared_data.mutex, NULL);
#endif
shared_data.count = 0;
for (int i = 0; i < NTHREADS; i++)
{
thread_data[i] = aligned_alloc(4096, sizeof(per_thread_data));
memset(thread_data[i], 0, sizeof(per_thread_data));
pthread_create(&threads[i], NULL, thread_main, thread_data[i]);
}
pthread_barrier_wait(&barrier);
sleep(secs);
if (verbose)
fprintf(stderr, "shared counter after 1s (before killing) is: %llu\n",
(long long unsigned) shared_data.count);
for (int i = 0; i < NTHREADS; i++)
{
pthread_kill(threads[i], SIGINT);
}
for (int i = 0; i < NTHREADS; i++)
{
pthread_join(threads[i], NULL);
}
uint64_t thread_count_sum = 0;
uint64_t thread_retries_sum = 0;
for (int i = 0; i < NTHREADS; i++)
{
thread_count_sum += thread_data[i]->count;
thread_retries_sum += thread_data[i]->retries;
// XXX: compute stddev instead
if (verbose)
fprintf(stderr, "thread %d: %llu\n",
i, (long long unsigned) thread_data[i]->count);
}
if (verbose ||
llabs((long long) shared_data.count - (long long) thread_count_sum) > NTHREADS)
{
fprintf(stderr, "final counters after killing are: %llu, per-thread counters sum %llu, diff %lld (allowed <= %d), retries %llu\n",
(long long unsigned) shared_data.count,
(long long unsigned) thread_count_sum,
(long long) shared_data.count - (long long) thread_count_sum,
-NTHREADS,
(long long unsigned) thread_retries_sum);
}
fprintf(stdout, "throughput per thread: %.2fM/s, total: %.2fM/s\n",
((double) thread_count_sum / NTHREADS) / secs / 1000000,
((double) thread_count_sum) / secs / 1000000);
return 0;
}
#!/bin/bash
set -e
cd ~/tmp
for w in 32 64 128 ; do
for method in NON_ATOMIC_METHOD_RACY ATOMIC_METHOD_MUTEX ATOMIC_METHOD_ADD ATOMIC_METHOD_INC ATOMIC_METHOD_XADD ATOMIC_METHOD_CMPXCHG ATOMIC_METHOD_CMPXCHG_ASM NON_ATOMIC_METHOD_CMPXCHG_ASM ATOMIC_METHOD_CMPXCHG16_ASM ; do
for concurrency in 1 $(getconf _NPROCESSORS_ONLN) $(($(getconf _NPROCESSORS_ONLN) * 4)); do
if [ ${w} -eq 128 -a ${method} != NON_ATOMIC_METHOD_RACY -a ${method} != ATOMIC_METHOD_CMPXCHG16_ASM -a ${method} != ATOMIC_METHOD_MUTEX ]; then
continue;
elif [ ${w} -ne 128 -a ${method} == ATOMIC_METHOD_CMPXCHG16_ASM ]; then
continue;
fi;
echo bits: $w, method: $method, concurrency: $concurrency:
gcc -O3 -ggdb -Werror -Wall -Wno-unused-function \
-D$method -DSECS=3 -DNTHREADS=$concurrency -DATOMIC_WIDTH=$w \
threaded_atomic_bench.c -o threaded_atomic_bench -lpthread
./threaded_atomic_bench
echo
done
done
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment